diff --git a/swh/auth/django/views.py b/swh/auth/django/views.py index 741f53e..b463bec 100644 --- a/swh/auth/django/views.py +++ b/swh/auth/django/views.py @@ -1,152 +1,152 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, cast import uuid -from django.conf.urls import url from django.contrib.auth import authenticate, login, logout from django.core.cache import cache from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseRedirect, HttpResponseServerError, ) +from django.urls import re_path as url from swh.auth.django.models import OIDCUser from swh.auth.django.utils import keycloak_oidc_client, oidc_profile_cache_key, reverse from swh.auth.keycloak import KeycloakError, keycloak_error_message from swh.auth.utils import gen_oidc_pkce_codes def oidc_login_view(request: HttpRequest, redirect_uri: str, scope: str = "openid"): """ Helper view function that initiates a login process using OIDC authorization code flow with PKCE. OIDC session scope can be modified using the dedicated parameter. """ # generate a CSRF token state = str(uuid.uuid4()) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": scope, } try: oidc_client = keycloak_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) except KeycloakError as ke: return HttpResponseServerError(keycloak_error_message(ke)) return HttpResponseRedirect(authorization_url) def get_oidc_login_data(request: HttpRequest) -> Dict[str, Any]: """ Check and get login data stored in django session. """ if "login_data" not in request.session: raise Exception("Login process has not been initialized.") login_data = request.session["login_data"] if "code" not in request.GET or "state" not in request.GET: raise ValueError("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise ValueError("Wrong CSRF token, aborting login process.") return login_data def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect authorization code flow with PKCE. """ redirect_uri = reverse("oidc-login-complete", request=request) return oidc_login_view(request, redirect_uri=redirect_uri) def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect authorization code flow with PKCE. """ if "error" in request.GET: return HttpResponseServerError(request.GET["error"]) try: login_data = get_oidc_login_data(request) except ValueError as ve: return HttpResponseBadRequest(str(ve)) except Exception as e: return HttpResponseServerError(str(e)) next_path = login_data["next_path"] or request.build_absolute_uri("/") user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: return HttpResponseServerError("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) try: # end OpenID Connect session oidc_client = keycloak_oidc_client() oidc_client.logout(refresh_token) except KeycloakError as ke: return HttpResponseServerError(keycloak_error_message(ke)) # remove user data from cache cache.delete(oidc_profile_cache_key(oidc_client, user.id)) return HttpResponseRedirect(request.GET.get("next_path", "/")) urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), ] diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py index 118937b..33ef978 100644 --- a/swh/auth/tests/django/app/apptest/urls.py +++ b/swh/auth/tests/django/app/apptest/urls.py @@ -1,28 +1,28 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url from django.contrib.auth.views import LogoutView from django.http import HttpResponse +from django.urls import re_path as url from rest_framework.decorators import api_view from rest_framework.response import Response from swh.auth.django.views import urlpatterns as auth_urlpatterns def _root_view(request): return HttpResponse("Hello World !") @api_view() def _api_view_test(request): return Response({"message": "Hello World !"}) urlpatterns = [ url(r"^$", _root_view, name="root"), url(r"^api/test$", _api_view_test, name="api-test"), url(r"^logout/$", LogoutView.as_view(), name="logout"), ] + auth_urlpatterns