diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -10,11 +10,16 @@ # 3rd party libraries without stubs (yet) +[mypy-jose.*] +ignore_missing_imports = True + +[mypy-keycloak.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -[mypy-keycloak.*] -ignore_missing_imports = True + diff --git a/pytest.ini b/pytest.ini --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ [pytest] +addopts = -p no:flask norecursedirs = docs .* DJANGO_SETTINGS_MODULE = swh.auth.tests.django.app.apptest.settings diff --git a/requirements-django.txt b/requirements-django.txt --- a/requirements-django.txt +++ b/requirements-django.txt @@ -1 +1,3 @@ -Django<3 +django < 3 +sentry-sdk + diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py new file mode 100644 --- /dev/null +++ b/swh/auth/django/backends.py @@ -0,0 +1,112 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional + +from django.core.cache import cache +from django.http import HttpRequest +from django.utils import timezone +import sentry_sdk + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import ( + keycloak_oidc_client, + oidc_profile_cache_key, + oidc_user_from_profile, +) +from swh.auth.keycloak import KeycloakOpenIDConnect + + +def _update_cached_oidc_profile( + oidc_client: KeycloakOpenIDConnect, oidc_profile: Dict[str, Any], user: OIDCUser +) -> None: + """ + Update cached OIDC profile associated to a user if needed: when the profile + is not stored in cache or when the authentication tokens have changed. + + Args: + oidc_client: KeycloakOpenID wrapper + oidc_profile: OIDC profile used to authenticate a user + user: django model representing the authenticated user + """ + # put OIDC profile in cache or update it after token renewal + cache_key = oidc_profile_cache_key(oidc_client, user.id) + if ( + cache.get(cache_key) is None + or user.access_token != oidc_profile["access_token"] + ): + # set cache key TTL as refresh token expiration time + assert user.refresh_expires_at + ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) + + # save oidc_profile in cache + cache.set(cache_key, user.oidc_profile, timeout=max(0, ttl)) + + +class OIDCAuthorizationCodePKCEBackend: + """ + Django authentication backend using Keycloak OpenID Connect authorization + code flow with PKCE ("Proof Key for Code Exchange"). + + To use that backend globally in your django application, proceed as follow: + + * add ``"swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend"`` + to the ``AUTHENTICATION_BACKENDS`` django setting + + * configure Keycloak URL, realm and client by adding + ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID`` + in django settings + + * add ``swh.auth.django.views.urlpatterns`` to your django application URLs + + * add an HTML link targeting the ``"oidc-login"`` django view in your + application views + + * once a user is logged in, add an HTML link targeting the ``"oidc-logout"`` + django view in your application views (a ``next_path`` query parameter + can be used to redirect to a view of choice once the user is logged out) + + """ + + def authenticate( + self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str + ) -> Optional[OIDCUser]: + + user = None + try: + oidc_client = keycloak_oidc_client() + # try to authenticate user with OIDC PKCE authorization code flow + oidc_profile = oidc_client.authorization_code( + code, redirect_uri, code_verifier=code_verifier + ) + + # create Django user + user = oidc_user_from_profile(oidc_client, oidc_profile) + + # update cached oidc profile if needed + _update_cached_oidc_profile(oidc_client, oidc_profile, user) + + except Exception as e: + sentry_sdk.capture_exception(e) + + return user + + def get_user(self, user_id: int) -> Optional[OIDCUser]: + # get oidc profile from cache + oidc_client = keycloak_oidc_client() + oidc_profile = cache.get(oidc_profile_cache_key(oidc_client, user_id)) + if oidc_profile: + try: + user = oidc_user_from_profile(oidc_client, oidc_profile) + # update cached oidc profile if needed + _update_cached_oidc_profile(oidc_client, oidc_profile, user) + # restore auth backend + setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") + return user + except Exception as e: + sentry_sdk.capture_exception(e) + return None + else: + return None diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py --- a/swh/auth/django/utils.py +++ b/swh/auth/django/utils.py @@ -7,9 +7,11 @@ from typing import Any, Dict, Optional from django.conf import settings +from django.http import HttpRequest, QueryDict +from django.urls import reverse as django_reverse from swh.auth.django.models import OIDCUser -from swh.auth.keycloak import KeycloakOpenIDConnect +from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect def oidc_user_from_decoded_token( @@ -76,7 +78,15 @@ """ # decode JWT token - decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) + try: + access_token = oidc_profile["access_token"] + decoded_token = oidc_client.decode_token(access_token) + # access token has expired + except ExpiredSignatureError: + # get a new access token from authentication provider + oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) + # decode access token + decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = oidc_user_from_decoded_token(decoded_token, client_id=oidc_client.client_id) @@ -99,13 +109,17 @@ return user +def oidc_profile_cache_key(oidc_client: KeycloakOpenIDConnect, user_id: int) -> str: + return f"oidc_user_{oidc_client.realm_name}_{oidc_client.client_id}_{user_id}" + + def keycloak_oidc_client() -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class from the following django settings: - * KEYCLOAK_SERVER_URL - * KEYCLOAK_REALM_NAME - * KEYCLOAK_CLIENT_ID + * SWH_AUTH_SERVER_URL + * SWH_AUTH_REALM_NAME + * SWH_AUTH_CLIENT_ID Returns: An object to ease the interaction with the Keycloak server @@ -114,16 +128,62 @@ ValueError: at least one mandatory django setting is not set """ - server_url = getattr(settings, "KEYCLOAK_SERVER_URL", None) - realm_name = getattr(settings, "KEYCLOAK_REALM_NAME", None) - client_id = getattr(settings, "KEYCLOAK_CLIENT_ID", None) + server_url = getattr(settings, "SWH_AUTH_SERVER_URL", None) + realm_name = getattr(settings, "SWH_AUTH_REALM_NAME", None) + client_id = getattr(settings, "SWH_AUTH_CLIENT_ID", None) if server_url is None or realm_name is None or client_id is None: raise ValueError( - "KEYCLOAK_SERVER_URL, KEYCLOAK_REALM_NAME and KEYCLOAK_CLIENT_ID django " + "SWH_AUTH_SERVER_URL, SWH_AUTH_REALM_NAME and SWH_AUTH_CLIENT_ID django " "settings are mandatory to instantiate KeycloakOpenIDConnect class" ) return KeycloakOpenIDConnect( server_url=server_url, realm_name=realm_name, client_id=client_id ) + + +def reverse( + viewname: str, + url_args: Optional[Dict[str, Any]] = None, + query_params: Optional[Dict[str, Any]] = None, + current_app: Optional[str] = None, + urlconf: Optional[str] = None, + request: Optional[HttpRequest] = None, +) -> str: + """An override of django reverse function supporting query parameters. + + Args: + viewname: the name of the django view from which to compute a url + url_args: dictionary of url arguments indexed by their names + query_params: dictionary of query parameters to append to the + reversed url + current_app: the name of the django app tighten to the view + urlconf: url configuration module + request: build an absolute URI if provided + + Returns: + str: the url of the requested view with processed arguments and + query parameters + """ + + if url_args: + url_args = {k: v for k, v in url_args.items() if v is not None} + + url = django_reverse( + viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app + ) + + if query_params: + query_params = {k: v for k, v in query_params.items() if v is not None} + + if query_params and len(query_params) > 0: + query_dict = QueryDict("", mutable=True) + for k in sorted(query_params.keys()): + query_dict[k] = query_params[k] + url += "?" + query_dict.urlencode(safe="/;:") + + if request is not None: + url = request.build_absolute_uri(url) + + return url diff --git a/swh/auth/django/views.py b/swh/auth/django/views.py new file mode 100644 --- /dev/null +++ b/swh/auth/django/views.py @@ -0,0 +1,152 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, cast +import uuid + +from django.conf.urls import url +from django.contrib.auth import authenticate, login, logout +from django.core.cache import cache +from django.http import HttpRequest +from django.http.response import ( + HttpResponse, + HttpResponseBadRequest, + HttpResponseRedirect, + HttpResponseServerError, +) + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import keycloak_oidc_client, oidc_profile_cache_key, reverse +from swh.auth.keycloak import KeycloakError, keycloak_error_message +from swh.auth.utils import gen_oidc_pkce_codes + + +def oidc_login_view(request: HttpRequest, redirect_uri: str, scope: str = "openid"): + """ + Helper view function that initiates a login process using OIDC authorization + code flow with PKCE. + + OIDC session scope can be modified using the dedicated parameter. + """ + # generate a CSRF token + state = str(uuid.uuid4()) + + code_verifier, code_challenge = gen_oidc_pkce_codes() + + request.session["login_data"] = { + "code_verifier": code_verifier, + "state": state, + "redirect_uri": redirect_uri, + "next_path": request.GET.get("next_path", ""), + } + + authorization_url_params = { + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "scope": scope, + } + + try: + oidc_client = keycloak_oidc_client() + authorization_url = oidc_client.authorization_url( + redirect_uri, **authorization_url_params + ) + except KeycloakError as ke: + return HttpResponseServerError(keycloak_error_message(ke)) + + return HttpResponseRedirect(authorization_url) + + +def get_oidc_login_data(request: HttpRequest) -> Dict[str, Any]: + """ + Check and get login data stored in django session. + """ + if "login_data" not in request.session: + raise Exception("Login process has not been initialized.") + + login_data = request.session["login_data"] + + if "code" not in request.GET or "state" not in request.GET: + raise ValueError("Missing query parameters for authentication.") + + # get CSRF token returned by OIDC server + state = request.GET["state"] + + if state != login_data["state"]: + raise ValueError("Wrong CSRF token, aborting login process.") + + return login_data + + +def oidc_login(request: HttpRequest) -> HttpResponse: + """ + Django view to initiate login process using OpenID Connect authorization + code flow with PKCE. + """ + + redirect_uri = reverse("oidc-login-complete", request=request) + + return oidc_login_view(request, redirect_uri=redirect_uri) + + +def oidc_login_complete(request: HttpRequest) -> HttpResponse: + """ + Django view to finalize login process using OpenID Connect authorization + code flow with PKCE. + """ + if "error" in request.GET: + return HttpResponseServerError(request.GET["error"]) + + try: + login_data = get_oidc_login_data(request) + except ValueError as ve: + return HttpResponseBadRequest(str(ve)) + except Exception as e: + return HttpResponseServerError(str(e)) + + next_path = login_data["next_path"] or request.build_absolute_uri("/") + + user = authenticate( + request=request, + code=request.GET["code"], + code_verifier=login_data["code_verifier"], + redirect_uri=login_data["redirect_uri"], + ) + + if user is None: + return HttpResponseServerError("User authentication failed.") + + login(request, user) + + return HttpResponseRedirect(next_path) + + +def oidc_logout(request: HttpRequest) -> HttpResponse: + """ + Django view to logout using OpenID Connect. + """ + user = request.user + logout(request) + if hasattr(user, "refresh_token"): + user = cast(OIDCUser, user) + refresh_token = cast(str, user.refresh_token) + try: + # end OpenID Connect session + oidc_client = keycloak_oidc_client() + oidc_client.logout(refresh_token) + except KeycloakError as ke: + return HttpResponseServerError(keycloak_error_message(ke)) + # remove user data from cache + cache.delete(oidc_profile_cache_key(oidc_client, user.id)) + + return HttpResponseRedirect(request.GET.get("next_path", "/")) + + +urlpatterns = [ + url(r"^oidc/login/$", oidc_login, name="oidc-login"), + url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), + url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), +] diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -7,10 +7,13 @@ from typing import Any, Dict, Optional from urllib.parse import urlencode +# add ExpiredSignatureError alias to avoid leaking jose import +# in swh-auth client code +from jose.jwt import ExpiredSignatureError # noqa from keycloak import KeycloakOpenID -# The next import is required to allow callers to catch on their own term the following -# exception +# add KeycloakError alias to avoid leaking keycloak import +# in swh-auth client code from keycloak.exceptions import KeycloakError # noqa from swh.core.config import load_from_envvar diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py --- a/swh/auth/pytest_plugin.py +++ b/swh/auth/pytest_plugin.py @@ -204,6 +204,17 @@ # generic keycloak fixture that can be used within tests # (cf. test_keycloak.py, test_utils.py, django related tests) # or external modules using that pytest plugin -keycloak_oidc = keycloak_oidc_factory( +_keycloak_oidc = keycloak_oidc_factory( server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, ) + + +@pytest.fixture +def keycloak_oidc(_keycloak_oidc, mocker): + for oidc_client_factory in ( + "swh.auth.django.views.keycloak_oidc_client", + "swh.auth.django.backends.keycloak_oidc_client", + ): + keycloak_oidc_client = mocker.patch(oidc_client_factory) + keycloak_oidc_client.return_value = _keycloak_oidc + return _keycloak_oidc diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/conftest.py @@ -0,0 +1,12 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + + +# Alias rf fixture from pytest-django +@pytest.fixture +def request_factory(rf): + return rf diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py --- a/swh/auth/tests/django/app/apptest/settings.py +++ b/swh/auth/tests/django/app/apptest/settings.py @@ -1,7 +1,41 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.auth.tests.sample_data import CLIENT_ID, REALM_NAME, SERVER_URL + SECRET_KEY = "o+&ayiuk(y^wh4ijz5e=c2$$kjj7g^6r%z+8d*c0lbpfs##k#7" INSTALLED_APPS = [ "django.contrib.auth", "django.contrib.contenttypes", + "django.contrib.sessions", "swh.auth.tests.django.app.apptest", ] + +MIDDLEWARE = [ + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", +] + +ROOT_URLCONF = "swh.auth.tests.django.app.apptest.urls" + +DATABASES = { + "default": {"ENGINE": "django.db.backends.sqlite3", "NAME": "swh-auth-test-db",} +} + +SESSION_ENGINE = "django.contrib.sessions.backends.cache" + +AUTHENTICATION_BACKENDS = [ + "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", +] + +SWH_AUTH_SERVER_URL = SERVER_URL +SWH_AUTH_REALM_NAME = REALM_NAME +SWH_AUTH_CLIENT_ID = CLIENT_ID diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py --- a/swh/auth/tests/django/app/apptest/urls.py +++ b/swh/auth/tests/django/app/apptest/urls.py @@ -3,4 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -urlpatterns = [] # type: ignore +from django.conf.urls import url +from django.http import HttpResponse + +from swh.auth.django.views import urlpatterns as auth_urlpatterns + + +def _root_view(request): + return HttpResponse("Hello World !") + + +urlpatterns = [url(r"^$", _root_view, name="root")] + auth_urlpatterns diff --git a/swh/auth/tests/django/django_asserts.py b/swh/auth/tests/django/django_asserts.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/django/django_asserts.py @@ -0,0 +1,21 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# https://github.com/pytest-dev/pytest-django/pull/709 proposes a more +# generic way to expose all asserts but it makes mypy unhappy. +# So explicitly expose the assertions we need for swh-web tests to +# avoid mypy errors + +""" +Expose some Django assertions to be used with pytest +""" + +from django.test import TestCase + +_test_case = TestCase("run") + +assert_template_used = _test_case.assertTemplateUsed +assert_contains = _test_case.assertContains +assert_not_contains = _test_case.assertNotContains diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/django/test_backends.py @@ -0,0 +1,151 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timedelta +from unittest.mock import Mock + +from django.conf import settings +from django.contrib.auth import authenticate, get_backends +import pytest + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import reverse +from swh.auth.keycloak import ExpiredSignatureError + + +def _authenticate_user(request_factory): + request = request_factory.get(reverse("root")) + + return authenticate( + request=request, + code="some-code", + code_verifier="some-code-verifier", + redirect_uri="https://localhost:5004", + ) + + +def _check_authenticated_user(user, decoded_token, keycloak_oidc): + assert user is not None + assert isinstance(user, OIDCUser) + assert user.id != 0 + assert user.username == decoded_token["preferred_username"] + assert user.password == "" + assert user.first_name == decoded_token["given_name"] + assert user.last_name == decoded_token["family_name"] + assert user.email == decoded_token["email"] + assert user.is_staff == ("/staff" in decoded_token["groups"]) + assert user.sub == decoded_token["sub"] + resource_access = decoded_token.get("resource_access", {}) + resource_access_client = resource_access.get(keycloak_oidc.client_id, {}) + assert user.permissions == set(resource_access_client.get("roles", [])) + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_success(keycloak_oidc, request_factory): + """ + Checks successful login based on OpenID Connect with PKCE extension + Django authentication backend (login from Web UI). + """ + keycloak_oidc.user_groups = ["/staff"] + + oidc_profile = keycloak_oidc.login() + user = _authenticate_user(request_factory) + + decoded_token = keycloak_oidc.decode_token(user.access_token) + _check_authenticated_user(user, decoded_token, keycloak_oidc) + + auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) + exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) + refresh_exp_datetime = auth_datetime + timedelta( + seconds=oidc_profile["refresh_expires_in"] + ) + + assert user.access_token == oidc_profile["access_token"] + assert user.expires_at == exp_datetime + assert user.id_token == oidc_profile["id_token"] + assert user.refresh_token == oidc_profile["refresh_token"] + assert user.refresh_expires_at == refresh_exp_datetime + assert user.scope == oidc_profile["scope"] + assert user.session_state == oidc_profile["session_state"] + + backend_path = "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend" + assert user.backend == backend_path + backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) + assert get_backends()[backend_idx].get_user(user.id) == user + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_failure(keycloak_oidc, request_factory): + """ + Checks failed login based on OpenID Connect with PKCE extension Django + authentication backend (login from Web UI). + """ + keycloak_oidc.set_auth_success(False) + + user = _authenticate_user(request_factory) + + assert user is None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_success( + keycloak_oidc, request_factory +): + """ + Checks access token renewal success using refresh token. + """ + + oidc_profile = keycloak_oidc.login() + decoded_token = keycloak_oidc.decode_token(oidc_profile["access_token"]) + + keycloak_oidc.decode_token = Mock() + keycloak_oidc.decode_token.side_effect = [ + ExpiredSignatureError("access token token has expired"), + decoded_token, + ] + + user = _authenticate_user(request_factory) + + oidc_profile = keycloak_oidc.login() + keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) + + assert user is not None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_refresh_token_failure( + keycloak_oidc, request_factory +): + """ + Checks access token renewal failure using refresh token. + """ + keycloak_oidc.decode_token = Mock() + keycloak_oidc.decode_token.side_effect = ExpiredSignatureError( + "access token token has expired" + ) + keycloak_oidc.refresh_token.side_effect = Exception("OIDC session has expired") + + user = _authenticate_user(request_factory) + + oidc_profile = keycloak_oidc.login() + keycloak_oidc.refresh_token.assert_called_with(oidc_profile["refresh_token"]) + + assert user is None + + +@pytest.mark.django_db +def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory): + """ + Checks that a permission defined with OpenID Connect is correctly mapped + to a Django one when logging from Web UI. + """ + permission = "webapp.some-permission" + keycloak_oidc.user_permissions = [permission] + user = _authenticate_user(request_factory) + assert user.has_perm(permission) + assert user.get_all_permissions() == {permission} + assert user.get_group_permissions() == {permission} + assert user.has_module_perms("webapp") + assert not user.has_module_perms("foo") diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py --- a/swh/auth/tests/django/test_utils.py +++ b/swh/auth/tests/django/test_utils.py @@ -97,6 +97,9 @@ _check_user(user) +@override_settings( + SWH_AUTH_SERVER_URL=None, SWH_AUTH_REALM_NAME=None, SWH_AUTH_CLIENT_ID=None, +) def test_keycloak_oidc_client_missing_django_settings(): with pytest.raises(ValueError, match="settings are mandatory"): @@ -104,9 +107,9 @@ @override_settings( - KEYCLOAK_SERVER_URL=SERVER_URL, - KEYCLOAK_REALM_NAME=REALM_NAME, - KEYCLOAK_CLIENT_ID=CLIENT_ID, + SWH_AUTH_SERVER_URL=SERVER_URL, + SWH_AUTH_REALM_NAME=REALM_NAME, + SWH_AUTH_CLIENT_ID=CLIENT_ID, ) def test_keycloak_oidc_client_parameters_from_django_settings(): diff --git a/swh/auth/tests/django/test_views.py b/swh/auth/tests/django/test_views.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/django/test_views.py @@ -0,0 +1,282 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from urllib.parse import urljoin, urlparse +import uuid + +from django.contrib.auth.models import AnonymousUser, User +from django.http import QueryDict +import pytest + +from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import reverse +from swh.auth.keycloak import KeycloakError +from swh.auth.tests.django.django_asserts import assert_contains +from swh.auth.tests.sample_data import CLIENT_ID + + +def _check_oidc_login_code_flow_data( + request, response, keycloak_oidc, redirect_uri, scope="openid" +): + parsed_url = urlparse(response["location"]) + + authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] + query_dict = QueryDict(parsed_url.query) + + # check redirect url is valid + assert urljoin(response["location"], parsed_url.path) == authorization_url + assert "client_id" in query_dict + assert query_dict["client_id"] == CLIENT_ID + assert "response_type" in query_dict + assert query_dict["response_type"] == "code" + assert "redirect_uri" in query_dict + assert query_dict["redirect_uri"] == redirect_uri + assert "code_challenge_method" in query_dict + assert query_dict["code_challenge_method"] == "S256" + assert "scope" in query_dict + assert query_dict["scope"] == scope + assert "state" in query_dict + assert "code_challenge" in query_dict + + # check a login_data has been registered in user session + assert "login_data" in request.session + login_data = request.session["login_data"] + assert "code_verifier" in login_data + assert "state" in login_data + assert "redirect_uri" in login_data + assert login_data["redirect_uri"] == query_dict["redirect_uri"] + return login_data + + +@pytest.mark.django_db +def test_oidc_login_views_success(client, keycloak_oidc): + """ + Simulate a successful login authentication with OpenID Connect + authorization code flow with PKCE. + """ + # user initiates login process + login_url = reverse("oidc-login") + + # should redirect to Keycloak authentication page in order + # for a user to login with its username / password + response = client.get(login_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert isinstance(request.user, AnonymousUser) + + login_data = _check_oidc_login_code_flow_data( + request, + response, + keycloak_oidc, + redirect_uri=reverse("oidc-login-complete", request=request), + ) + + # once a user has identified himself in Keycloak, he is + # redirected to the 'oidc-login-complete' view to + # login in Django. + + # generate authorization code / session state in the same + # manner as Keycloak + code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" + session_state = str(uuid.uuid4()) + + login_complete_url = reverse( + "oidc-login-complete", + query_params={ + "code": code, + "state": login_data["state"], + "session_state": session_state, + }, + ) + + # login process finalization, should redirect to root url by default + response = client.get(login_complete_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert response["location"] == request.build_absolute_uri("/") + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remote user has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_oidc_logout_view_success(client, keycloak_oidc): + """ + Simulate a successful logout operation with OpenID Connect. + """ + # login our test user + client.login(code="", code_verifier="", redirect_uri="") + keycloak_oidc.authorization_code.assert_called() + + # user initiates logout + next_path = reverse("root") + oidc_logout_url = reverse("oidc-logout", query_params={"next_path": next_path}) + + # should redirect to logout page + response = client.get(oidc_logout_url) + assert response.status_code == 302 + + request = response.wsgi_request + assert response["location"] == next_path + + # should have been logged out in Keycloak + oidc_profile = keycloak_oidc.login() + keycloak_oidc.logout.assert_called_with(oidc_profile["refresh_token"]) + + # check effective logout in Django + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_login_view_failure(client, keycloak_oidc): + """ + Simulate a failed authentication with OpenID Connect. + """ + keycloak_oidc.set_auth_success(False) + + # user initiates login process + login_url = reverse("oidc-login") + # should render an error page + response = client.get(login_url) + assert response.status_code == 500 + request = response.wsgi_request + + # no users should be logged in + assert isinstance(request.user, AnonymousUser) + + +# Simulate possible errors with OpenID Connect in the login complete view. + + +def test_oidc_login_complete_view_no_login_data(client): + # user initiates login process + login_url = reverse("oidc-login-complete") + # should return with error + response = client.get(login_url) + assert response.status_code == 500 + + assert_contains( + response, "Login process has not been initialized.", status_code=500 + ) + + +def test_oidc_login_complete_view_missing_parameters(client): + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # user initiates login process + login_url = reverse("oidc-login-complete") + + # should return with error + response = client.get(login_url) + assert response.status_code == 400 + request = response.wsgi_request + assert_contains( + response, "Missing query parameters for authentication.", status_code=400 + ) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +def test_oidc_login_complete_wrong_csrf_token(client, keycloak_oidc): + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # user initiates login process + login_url = reverse( + "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} + ) + + # should render an error page + response = client.get(login_url) + assert response.status_code == 400 + request = response.wsgi_request + assert_contains( + response, "Wrong CSRF token, aborting login process.", status_code=400 + ) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_login_complete_wrong_code_verifier(client, keycloak_oidc): + keycloak_oidc.set_auth_success(False) + + # simulate login process has been initialized + session = client.session + session["login_data"] = { + "code_verifier": "", + "state": str(uuid.uuid4()), + "redirect_uri": "", + "next_path": "", + } + session.save() + + # check authentication error is reported + login_url = reverse( + "oidc-login-complete", + query_params={"code": "some-code", "state": session["login_data"]["state"]}, + ) + + # should render an error page + response = client.get(login_url) + print(response.content) + assert response.status_code == 500 + + request = response.wsgi_request + assert_contains(response, "User authentication failed.", status_code=500) + + # no user should be logged in + assert isinstance(request.user, AnonymousUser) + + +@pytest.mark.django_db +def test_oidc_logout_view_failure(client, keycloak_oidc): + """ + Simulate a failed logout operation with OpenID Connect. + """ + # login our test user + client.login(code="", code_verifier="", redirect_uri="") + + error = "unknown_error" + error_message = json.dumps({"error": error}).encode() + keycloak_oidc.logout.side_effect = KeycloakError( + error_message=error_message, response_code=401 + ) + + # user initiates logout process + logout_url = reverse("oidc-logout") + + # should return with error + response = client.get(logout_url) + assert response.status_code == 500 + request = response.wsgi_request + assert_contains(response, error, status_code=500) + + # user should be logged out from Django anyway + assert isinstance(request.user, AnonymousUser) diff --git a/swh/auth/tests/test_utils.py b/swh/auth/tests/test_utils.py new file mode 100644 --- /dev/null +++ b/swh/auth/tests/test_utils.py @@ -0,0 +1,36 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from base64 import urlsafe_b64encode +import hashlib +import re + +from swh.auth.utils import gen_oidc_pkce_codes + + +def test_gen_oidc_pkce_codes(): + """ + Check generated PKCE codes respect the specification + (see https://tools.ietf.org/html/rfc7636#section-4.1) + """ + code_verifier, code_challenge = gen_oidc_pkce_codes() + + # check the code verifier only contains allowed characters + assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier) + + # check minimum and maximum authorized length for the + # code verifier + assert len(code_verifier) >= 43 + assert len(code_verifier) <= 128 + + # compute code challenge from code verifier + challenge = hashlib.sha256(code_verifier.encode("ascii")).digest() + challenge = urlsafe_b64encode(challenge).decode("ascii") + challenge = challenge.replace("=", "") + + # check base64 padding is not present + assert not code_challenge[-1].endswith("=") + # check code challenge is valid + assert code_challenge == challenge diff --git a/swh/auth/utils.py b/swh/auth/utils.py new file mode 100644 --- /dev/null +++ b/swh/auth/utils.py @@ -0,0 +1,35 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from base64 import urlsafe_b64encode +import hashlib +import secrets +from typing import Tuple + + +def gen_oidc_pkce_codes() -> Tuple[str, str]: + """ + Generates a code verifier and a code challenge to be used + with the OpenID Connect authorization code flow with PKCE + ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636). + + PKCE replaces the static secret used in the standard authorization + code flow with a temporary one-time challenge, making it feasible + to use in public clients. + + The implementation is inspired from that blog post: + https://www.stefaanlippens.net/oauth-code-flow-pkce.html + """ + # generate a code verifier which is a long enough random alphanumeric + # string, only to be used "client side" + code_verifier_str = secrets.token_urlsafe(60) + + # create the PKCE code challenge by hashing the code verifier with SHA256 + # and encoding the result in URL-safe base64 (without padding) + code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest() + code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii") + code_challenge_str = code_challenge_str.replace("=", "") + + return code_verifier_str, code_challenge_str