diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js index 84acf2cd..24d470c6 100644 --- a/cypress/integration/api-tokens.spec.js +++ b/cypress/integration/api-tokens.spec.js @@ -1,211 +1,140 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ describe('Test API tokens UI', function() { it('should ask for user to login', function() { cy.visit(`${this.Urls.oidc_profile()}#tokens`, {failOnStatusCode: false}); cy.location().should(loc => { expect(loc.pathname).to.eq(this.Urls.oidc_login()); }); }); function initTokensPage(Urls, tokens) { cy.server(); cy.route({ method: 'GET', url: `${Urls.oidc_list_bearer_tokens()}/**`, response: { 'recordsTotal': tokens.length, 'draw': 2, 'recordsFiltered': tokens.length, 'data': tokens } }); // the tested UI should not be accessible for standard Django users // but we need a user logged in for testing it cy.adminLogin(); cy.visit(`${Urls.oidc_profile()}#tokens`); } - function generateToken(Urls, status, tokenValue = '') { - cy.route({ - method: 'POST', - url: `${Urls.oidc_generate_bearer_token()}/**`, - response: tokenValue, - status: status - }).as('generateTokenRequest'); + it('should initiate token generation flow', function() { + initTokensPage(this.Urls, []); cy.contains('Generate new token') .click(); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Bearer token generation'); - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret'); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .click(); - cy.wait('@generateTokenRequest'); - - if (status === 200) { - cy.get('#swh-user-password-submit') - .should('be.disabled'); - } - } - - it('should generate and display bearer token', function() { - initTokensPage(this.Urls, []); - const tokenValue = 'bearer-token-value'; - generateToken(this.Urls, 200, tokenValue); - cy.get('#swh-token-success-message') - .should('contain', 'Below is your token'); - cy.get('#swh-bearer-token') - .should('contain', tokenValue); + cy.location().should(loc => { + expect(loc.pathname).to.eq(this.Urls.oidc_generate_bearer_token()); + }); }); - it('should report errors when token generation failed', function() { - initTokensPage(this.Urls, []); - generateToken(this.Urls, 400); - cy.get('#swh-token-error-message') + it('should report error when not logged in and visiting a token generation URL', function() { + cy.visit(this.Urls.oidc_generate_bearer_token_complete(), {failOnStatusCode: false}); + cy.get('.swh-http-error') + .should('be.visible'); + cy.get('.swh-http-error-code') + .should('contain', 403); + cy.get('.swh-http-error-desc') .should('contain', 'You are not allowed to generate bearer tokens'); - cy.get('#swh-web-modal-html .close').click(); - generateToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); - generateToken(this.Urls, 500); - cy.get('#swh-token-error-message') - .should('contain', 'Internal server error'); - }); function displayToken(Urls, status, tokenValue = '') { cy.route({ method: 'POST', url: `${Urls.oidc_get_bearer_token()}/**`, response: tokenValue, status: status }).as('getTokenRequest'); cy.contains('Display token') .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Display bearer token'); - - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret', {force: true}); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') - .click({force: true}); - - cy.wait('@getTokenRequest'); - - if (status === 200) { - cy.get('#swh-user-password-submit') - .should('be.disabled'); - } } it('should show a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); const tokenValue = 'token-value'; displayToken(this.Urls, 200, tokenValue); - cy.get('#swh-token-success-message') - .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token display failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); - displayToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); displayToken(this.Urls, 500); - cy.get('#swh-token-error-message') + cy.get('.modal-body') .should('contain', 'Internal server error'); }); function revokeToken(Urls, status) { cy.route({ method: 'POST', url: `${Urls.oidc_revoke_bearer_tokens()}/**`, response: '', status: status }).as('revokeTokenRequest'); cy.contains('Revoke token') .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Revoke bearer token'); - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret', {force: true}); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .click({force: true}); cy.wait('@revokeTokenRequest'); if (status === 200) { - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .should('be.disabled'); } } it('should revoke a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 200); - cy.get('#swh-token-success-message') + cy.get('#swh-token-form-message') .should('contain', 'Bearer token successfully revoked'); }); it('should report errors when token revoke failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); - revokeToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); revokeToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); }); diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js index afcf5fe6..4156b4b9 100644 --- a/swh/web/assets/src/bundles/auth/index.js +++ b/swh/web/assets/src/bundles/auth/index.js @@ -1,226 +1,187 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ import {handleFetchError, csrfPost, removeUrlFragment} from 'utils/functions'; import './auth.css'; let apiTokensTable; -function updateSubmitButtonState() { - const val = $('#swh-user-password').val(); - $('#swh-user-password-submit').prop('disabled', val.length === 0); -} - -function passwordForm(infoText, buttonText) { +function tokenForm(infoText, buttonText) { const form = - `
-

${infoText}

- - - + ` +

${infoText}

+ +
`; return form; } function errorMessage(message) { - return `

${message}

`; + return `

${message}

`; } function successMessage(message) { - return `

${message}

`; + return `

${message}

`; } function disableSubmitButton() { - $('#swh-user-password-submit').prop('disabled', true); - $('#swh-user-password').off('change'); - $('#swh-user-password').off('keyup'); + $('#swh-token-form-submit').prop('disabled', true); } function generateToken() { - csrfPost(Urls.oidc_generate_bearer_token(), {}, - JSON.stringify({password: $('#swh-user-password').val()})) - .then(handleFetchError) - .then(response => response.text()) - .then(token => { - disableSubmitButton(); - const tokenHtml = - `${successMessage('Below is your token.')} -
${token}
`; - $(`#swh-password-form`).append(tokenHtml); - apiTokensTable.draw(); - }) - .catch(response => { - if (response.status === 400) { - $(`#swh-password-form`).append( - errorMessage('You are not allowed to generate bearer tokens.')); - } else if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } - }); + window.location = Urls.oidc_generate_bearer_token(); } function displayToken(tokenId) { const postData = { - password: $('#swh-user-password').val(), token_id: tokenId }; csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(response => response.text()) .then(token => { - disableSubmitButton(); const tokenHtml = - `${successMessage('Below is your token.')} -
${token}
`; - $(`#swh-password-form`).append(tokenHtml); + `

Below is your token.

+
${token}
`; + swh.webapp.showModalHtml('Display bearer token', tokenHtml); }) - .catch(response => { - if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } + .catch(() => { + swh.webapp.showModalHtml('Display bearer token', errorMessage('Internal server error.')); }); } function revokeTokens(tokenIds) { const postData = { - password: $('#swh-user-password').val(), token_ids: tokenIds }; csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(() => { disableSubmitButton(); - $(`#swh-password-form`).append( - successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`)); + $('#swh-token-form-message').html( + successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked.`)); apiTokensTable.draw(); }) - .catch(response => { - if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } + .catch(() => { + $('#swh-token-form-message').html(errorMessage('Internal server error.')); }); } function revokeToken(tokenId) { revokeTokens([tokenId]); } function revokeAllTokens() { const tokenIds = []; const rowsData = apiTokensTable.rows().data(); for (let i = 0; i < rowsData.length; ++i) { tokenIds.push(rowsData[i].id); } revokeTokens(tokenIds); } export function applyTokenAction(action, tokenId) { const actionData = { + display: { + submitCallback: displayToken + }, generate: { modalTitle: 'Bearer token generation', - infoText: 'Enter your password and click on the button to generate the token.', + infoText: 'Click on the button to generate the token. You will be redirected to ' + + 'Software Heritage Authentication Service and might be asked to enter ' + + 'your password again.', buttonText: 'Generate token', submitCallback: generateToken }, - display: { - modalTitle: 'Display bearer token', - infoText: 'Enter your password and click on the button to display the token.', - buttonText: 'Display token', - submitCallback: displayToken - }, revoke: { modalTitle: 'Revoke bearer token', - infoText: 'Enter your password and click on the button to revoke the token.', + infoText: 'Click on the button to revoke the token.', buttonText: 'Revoke token', submitCallback: revokeToken }, revokeAll: { modalTitle: 'Revoke all bearer tokens', - infoText: 'Enter your password and click on the button to revoke all tokens.', + infoText: 'Click on the button to revoke all tokens.', buttonText: 'Revoke tokens', submitCallback: revokeAllTokens } }; if (!actionData[action]) { return; } - const passwordFormHtml = passwordForm( - actionData[action].infoText, actionData[action].buttonText); + if (action !== 'display') { + const tokenFormHtml = tokenForm( + actionData[action].infoText, actionData[action].buttonText); - swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml); - $('#swh-user-password').change(updateSubmitButtonState); - $('#swh-user-password').keyup(updateSubmitButtonState); - $(`#swh-password-form`).submit(event => { - event.preventDefault(); - event.stopPropagation(); + swh.webapp.showModalHtml(actionData[action].modalTitle, tokenFormHtml); + $(`#swh-token-form`).submit(event => { + event.preventDefault(); + event.stopPropagation(); + actionData[action].submitCallback(tokenId); + }); + } else { actionData[action].submitCallback(tokenId); - }); + } } export function initProfilePage() { $(document).ready(() => { apiTokensTable = $('#swh-bearer-tokens-table') .on('error.dt', (e, settings, techNote, message) => { $('#swh-origin-save-request-list-error').text( 'An error occurred while retrieving the tokens list'); console.log(message); }) .DataTable({ serverSide: true, ajax: Urls.oidc_list_bearer_tokens(), columns: [ { data: 'creation_date', name: 'creation_date', render: (data, type, row) => { if (type === 'display') { let date = new Date(data); return date.toLocaleString(); } return data; } }, { render: (data, type, row) => { const html = ` `; return html; } } ], ordering: false, searching: false, scrollY: '50vh', scrollCollapse: true }); $('#swh-oidc-profile-tokens-tab').on('shown.bs.tab', () => { apiTokensTable.draw(); window.location.hash = '#tokens'; }); $('#swh-oidc-profile-account-tab').on('shown.bs.tab', () => { removeUrlFragment(); }); if (window.location.hash === '#tokens') { $('.nav-tabs a[href="#swh-oidc-profile-tokens"]').tab('show'); } }); } diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py index 2831931b..e457a6cc 100644 --- a/swh/web/auth/keycloak.py +++ b/swh/web/auth/keycloak.py @@ -1,189 +1,168 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional, Tuple from urllib.parse import urlencode from keycloak import KeycloakOpenID class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) self.server_url = server_url self.realm_name = realm_name self.client_id = client_id self.realm_public_key = realm_public_key def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) - def offline_token(self, username: str, password: str) -> str: - """ - Generate an OpenID Connect offline refresh token. - - Offline tokens are a special type of refresh tokens with long-lived period. - It enables to open a new authenticated session without having to login again. - - Args: - username: username in the Keycloak realm - password: password associated to the username - - Returns: - An offline refresh token - """ - return self._keycloak.token( - grant_type="password", - scope="openid offline_access", - username=username, - password=password, - )["refresh_token"] - def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) # stores instances of KeycloakOpenIDConnect class # dict keys are (realm_name, client_id) tuples _keycloak_oidc: Dict[Tuple[str, str], KeycloakOpenIDConnect] = {} def get_keycloak_oidc_client( server_url: str, realm_name: str, client_id: str ) -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class for a given client in a given realm. Args: server_url: Base URL of a Keycloak server realm_name: Name of the realm in Keycloak client_id: Client identifier in the realm Returns: An object to ease the interaction with the Keycloak server """ realm_client_key = (realm_name, client_id) if realm_client_key not in _keycloak_oidc: _keycloak_oidc[realm_client_key] = KeycloakOpenIDConnect( server_url, realm_name, client_id ) return _keycloak_oidc[realm_client_key] diff --git a/swh/web/auth/migrations/0002_remove_stored_tokens.py b/swh/web/auth/migrations/0002_remove_stored_tokens.py new file mode 100644 index 00000000..5e1100a7 --- /dev/null +++ b/swh/web/auth/migrations/0002_remove_stored_tokens.py @@ -0,0 +1,22 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from django.db import migrations + +from swh.web.auth.models import OIDCUserOfflineTokens + + +def _remove_stored_encrypted_tokens(apps, schema_editor): + OIDCUserOfflineTokens.objects.all().delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("swh.web.auth", "0001_initial"), + ] + + operations = [migrations.RunPython(_remove_stored_encrypted_tokens)] diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 8f019a99..02fe63d6 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,251 +1,281 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast import uuid from cryptography.fernet import InvalidToken -from keycloak.exceptions import KeycloakError -import sentry_sdk from django.conf.urls import url from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, - HttpResponseServerError, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import ( decrypt_data, encrypt_data, gen_oidc_pkce_codes, get_oidc_client, ) -from swh.web.common.exc import BadInputExc +from swh.web.common.exc import BadInputExc, ForbiddenExc from swh.web.common.utils import reverse +from swh.web.config import get_config -def oidc_login(request: HttpRequest) -> HttpResponse: - """ - Django view to initiate login process using OpenID Connect. - """ +def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): # generate a CSRF token state = str(uuid.uuid4()) - redirect_uri = reverse("oidc-login-complete", request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), "prompt": request.GET.get("prompt", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", - "scope": "openid", + "scope": scope, "prompt": request.GET.get("prompt", ""), } oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) return HttpResponseRedirect(authorization_url) -def oidc_login_complete(request: HttpRequest) -> HttpResponse: +def oidc_login(request: HttpRequest) -> HttpResponse: """ - Django view to finalize login process using OpenID Connect. + Django view to initiate login process using OpenID Connect. """ + + redirect_uri = reverse("oidc-login-complete", request=request) + + return _oidc_login(request, redirect_uri=redirect_uri) + + +def _get_login_data(request: HttpRequest) -> Dict[str, Any]: if "login_data" not in request.session: raise Exception("Login process has not been initialized.") - login_data = request.session["login_data"] - next_path = login_data["next_path"] or request.build_absolute_uri("/") + return request.session["login_data"] - if "error" in request.GET: - if login_data["prompt"] == "none": - # Silent login failed because OIDC session expired. - # Redirect to logout page and inform user. - logout(request) - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) - return HttpResponseServerError(request.GET["error"]) + +def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") + +def oidc_login_complete(request: HttpRequest) -> HttpResponse: + """ + Django view to finalize login process using OpenID Connect. + """ + login_data = _get_login_data(request) + next_path = login_data["next_path"] or request.build_absolute_uri("/") + if "error" in request.GET and login_data["prompt"] == "none": + # Silent login failed because OIDC session expired. + # Redirect to logout page and inform user. + logout(request) + logout_url = reverse( + "logout", query_params={"next_path": next_path, "remote_user": 1} + ) + return HttpResponseRedirect(logout_url) + elif "error" in request.GET: + raise Exception(request.GET["error"]) + + _check_login_data(request, login_data) + user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: raise Exception("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f"oidc_user_{user.id}") logout_url = reverse("logout", query_params={"remote_user": 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) -@require_http_methods(["POST"]) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() - try: - data = json.loads(request.body.decode("utf-8")) - user = cast(OIDCUser, request.user) - oidc_client = get_oidc_client() - token = oidc_client.offline_token(user.username, data["password"]) - password = data["password"].encode() - salt = user.sub.encode() - encrypted_token = encrypt_data(token.encode(), password, salt) - OIDCUserOfflineTokens.objects.create( - user_id=str(user.id), offline_token=encrypted_token - ).save() - return HttpResponse(token, content_type="text/plain") - except KeycloakError as e: - sentry_sdk.capture_exception(e) - return HttpResponse(status=e.response_code or 500) + redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) + return _oidc_login( + request, redirect_uri=redirect_uri, scope="openid offline_access" + ) + + +def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: + if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): + raise ForbiddenExc("You are not allowed to generate bearer tokens.") + if "error" in request.GET: + raise Exception(request.GET["error"]) + + oidc_client = get_oidc_client() + login_data = _get_login_data(request) + _check_login_data(request, login_data) + oidc_profile = oidc_client.authorization_code( + code=request.GET["code"], + code_verifier=login_data["code_verifier"], + redirect_uri=login_data["redirect_uri"], + ) + user = cast(OIDCUser, request.user) + token = oidc_profile["refresh_token"] + secret = get_config()["secret_key"].encode() + salt = user.sub.encode() + encrypted_token = encrypt_data(token.encode(), secret, salt) + OIDCUserOfflineTokens.objects.create( + user_id=str(user.id), offline_token=encrypted_token + ).save() + return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: - data = json.loads(request.body.decode("utf-8")) + data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) - password = data["password"].encode() + secret = get_config()["secret_key"].encode() salt = user.sub.encode() - decrypted_token = decrypt_data(token_data.offline_token, password, salt) + decrypted_token = decrypt_data(token_data.offline_token, secret, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: - data = json.loads(request.body.decode("utf-8")) + data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) - password = data["password"].encode() + secret = get_config()["secret_key"].encode() salt = user.sub.encode() - decrypted_token = decrypt_data(token_data.offline_token, password, salt) + decrypted_token = decrypt_data(token_data.offline_token, secret, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), + url( + r"^oidc/generate-bearer-token-complete/$", + oidc_generate_bearer_token_complete, + name="oidc-generate-bearer-token-complete", + ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py index f4c6e31c..aa9e5701 100644 --- a/swh/web/settings/tests.py +++ b/swh/web/settings/tests.py @@ -1,109 +1,109 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django tests settings for swh-web. """ import os import sys from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config scope1_limiter_rate = 3 scope1_limiter_rate_post = 1 scope2_limiter_rate = 5 scope2_limiter_rate_post = 2 scope3_limiter_rate = 1 scope3_limiter_rate_post = 1 save_origin_rate_post = 10 swh_web_config = get_config() swh_web_config.update( { # disable django debug mode when running cypress tests "debug": "pytest" in sys.argv[0] or "PYTEST_XDIST_WORKER" in os.environ, "secret_key": "test", "history_counters_url": "", "throttling": { "cache_uri": None, "scopes": { "swh_api": { "limiter_rate": {"default": "60/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_search": { "limiter_rate": {"default": "100/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_api_origin_visit_latest": { "limiter_rate": {"default": "6000/min"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_vault_cooking": { "limiter_rate": {"default": "120/h", "GET": "60/m"}, "exempted_networks": ["127.0.0.0/8"], }, "swh_save_origin": { "limiter_rate": { "default": "120/h", "POST": "%s/h" % save_origin_rate_post, } }, "scope1": { "limiter_rate": { "default": "%s/min" % scope1_limiter_rate, "POST": "%s/min" % scope1_limiter_rate_post, } }, "scope2": { "limiter_rate": { "default": "%s/min" % scope2_limiter_rate, "POST": "%s/min" % scope2_limiter_rate_post, } }, "scope3": { "limiter_rate": { "default": "%s/min" % scope3_limiter_rate, "POST": "%s/min" % scope3_limiter_rate_post, }, "exempted_networks": ["127.0.0.0/8"], }, }, }, "keycloak": { - "server_url": "http://localhost:8080/auth", + "server_url": "http://localhost:8080/auth/", "realm_name": "SoftwareHeritage", }, } ) from .common import * # noqa from .common import ALLOWED_HOSTS, LOGGING # noqa, isort: skip DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config["test_db"], } } # when not running unit tests, make the webapp fetch data from memory storages if "pytest" not in sys.argv[0] and "PYTEST_XDIST_WORKER" not in os.environ: swh_web_config.update({"debug": True, "e2e_tests_mode": True}) from swh.web.tests.data import get_tests_data, override_storages test_data = get_tests_data() override_storages( test_data["storage"], test_data["idx_storage"], test_data["search"] ) else: ALLOWED_HOSTS += ["testserver", SWH_WEB_INTERNAL_SERVER_NAME] # Silent DEBUG output when running unit tests LOGGING["handlers"]["console"]["level"] = "INFO" # type: ignore diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py index 2b085816..093d12ff 100644 --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -1,120 +1,117 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from unittest.mock import Mock from keycloak.exceptions import KeycloakError from django.utils import timezone from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config from .sample_data import oidc_profile, realm_public_key, userinfo class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): def __init__( self, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): swhweb_config = get_config() super().__init__( swhweb_config["keycloak"]["server_url"], swhweb_config["keycloak"]["realm_name"], OIDC_SWH_WEB_CLIENT_ID, ) self.auth_success = auth_success self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions self._keycloak.public_key = lambda: realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } self.authorization_code = Mock() self.refresh_token = Mock() self.userinfo = Mock() self.logout = Mock() - self.offline_token = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) - self.offline_token.return_value = oidc_profile["refresh_token"] else: self.authorization_url = Mock() exception = KeycloakError( error_message="Authentication failed", response_code=401 ) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception - self.offline_token = exception def decode_token(self, token): options = {} if self.auth_success: # skip signature expiration check as we use a static oidc_profile # for the tests with expired tokens in it options["verify_exp"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["auth_time"] if self.exp is not None: decoded["exp"] = self.exp decoded["auth_time"] = self.exp - expire_in else: decoded["auth_time"] = int(timezone.now().timestamp()) decoded["exp"] = decoded["auth_time"] + expire_in decoded["groups"] = self.user_groups if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded def mock_keycloak( mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] ): kc_oidc_mock = KeycloackOpenIDConnectMock( auth_success, exp, user_groups, user_permissions ) mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") mock_get_oidc_client.return_value = kc_oidc_mock mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) return kc_oidc_mock diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index a05c6d07..b9f9c11e 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,566 +1,562 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid -from keycloak.exceptions import KeycloakError import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens -from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view from . import sample_data from .keycloak_mock import mock_keycloak -@pytest.mark.django_db -def test_oidc_login_views_success(client, mocker): - """ - Simulate a successful login authentication with OpenID Connect - authorization code flow with PKCE. - """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) - - # user initiates login process - login_url = reverse("oidc-login") - - # should redirect to Keycloak authentication page in order - # for a user to login with its username / password - response = check_html_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - +def _check_oidc_login_code_flow_data( + request, response, kc_oidc_mock, redirect_uri, scope="openid" +): parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict - assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request) + assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict - assert query_dict["scope"] == "openid" + assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] + return login_data + + +@pytest.mark.django_db +def test_oidc_login_views_success(client, mocker): + """ + Simulate a successful login authentication with OpenID Connect + authorization code flow with PKCE. + """ + # mock Keycloak client + kc_oidc_mock = mock_keycloak(mocker) + + # user initiates login process + login_url = reverse("oidc-login") + + # should redirect to Keycloak authentication page in order + # for a user to login with its username / password + response = check_html_get_response(client, login_url, status_code=302) + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser) + + login_data = _check_oidc_login_code_flow_data( + request, + response, + kc_oidc_mock, + redirect_uri=reverse("oidc-login-complete", request=request), + ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to root url by default response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, mocker): """ Simulate a successful logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") # should redirect to logout page response = check_html_get_response(client, oidc_logout_url, status_code=302) request = response.wsgi_request logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, mocker): """ Simulate a failed authentication with OpenID Connect. """ # mock Keycloak client mock_keycloak(mocker, auth_success=False) # user initiates login process login_url = reverse("oidc-login") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, mocker): # mock Keycloak client mock_keycloak(mocker) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, mocker): # mock Keycloak client mock_keycloak(mocker, auth_success=False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, mocker): """ Simulate a failed logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" kc_oidc_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") # should render an error page response = check_html_get_response( client, logout_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_silent_refresh_failure(client, mocker): # mock Keycloak client mock_keycloak(mocker) next_path = reverse("swh-web-homepage") # silent session refresh initialization login_url = reverse( "oidc-login", query_params={"next_path": next_path, "prompt": "none"} ) response = check_http_get_response(client, login_url, status_code=302) request = response.wsgi_request login_data = request.session["login_data"] # check prompt value has been registered in user session assert "prompt" in login_data assert login_data["prompt"] == "none" # simulate a failed silent session refresh session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "error": "login_required", "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to logout page response = check_http_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request logout_url = reverse( "logout", query_params={"next_path": next_path, "remote_user": 1} ) assert response["location"] == logout_url def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") - check_http_post_response(client, url, data={"password": "secret"}, status_code=403) + check_http_get_response(client, url, status_code=403) -def _generate_bearer_token(client, password): +def _generate_and_test_bearer_token(client, kc_oidc_mock): + # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) + # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") - return client.post( - url, data={"password": password}, content_type="application/json" + response = check_http_get_response(client, url, status_code=302) + request = response.wsgi_request + redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) + # check login data and redirection to Keycloak is valid + login_data = _check_oidc_login_code_flow_data( + request, + response, + kc_oidc_mock, + redirect_uri=redirect_uri, + scope="openid offline_access", ) + # once a user has identified himself in Keycloak, he is + # redirected to the 'oidc-generate-bearer-token-complete' view + # to get and save bearer token -@pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): - """ - User with correct credentials should be allowed to generate a token. - """ - kc_mock = mock_keycloak(mocker) - password = "secret" - response = _generate_bearer_token(client, password) - user = response.wsgi_request.user - assert response.status_code == 200 - assert response.content.decode("ascii") == kc_mock.offline_token( - username=user.username, password=password + # generate authorization code / session state in the same + # manner as Keycloak + code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" + session_state = str(uuid.uuid4()) + + token_complete_url = reverse( + "oidc-generate-bearer-token-complete", + query_params={ + "code": code, + "state": login_data["state"], + "session_state": session_state, + }, ) + nb_tokens = len(OIDCUserOfflineTokens.objects.all()) + response = check_html_get_response(client, token_complete_url, status_code=302) + request = response.wsgi_request + + # check token has been generated and saved encrypted to database + assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 + encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token + secret = get_config()["secret_key"].encode() + salt = request.user.sub.encode() + decrypted_token = decrypt_data(encrypted_token, secret, salt) + oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) + assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] + + # should redirect to tokens management Web UI + assert response["location"] == reverse("oidc-profile") + "#tokens" + + return decrypted_token + @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker): +def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): """ - User with wrong credentials should not be allowed to generate a token. + Authenticated user should be able to generate a bearer token using OIDC + Authorization Code Flow. """ - response_code = 401 - kc_mock = mock_keycloak(mocker) - kc_mock.offline_token.side_effect = KeycloakError( - error_message="Invalid password", response_code=response_code - ) - response = _generate_bearer_token(client, password="invalid-password") - assert response.status_code == response_code + kc_oidc_mock = mock_keycloak(mocker) + _generate_and_test_bearer_token(client, kc_oidc_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to list his tokens. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for _ in range(nb_tokens): - response = _generate_bearer_token(client, password) + _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, mocker): """ User with correct credentials should be allowed to display a token. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for i in range(nb_tokens): - response = _generate_bearer_token(client, password) - token = response.content + token = _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, - data={"password": password, "token_id": i + 1}, + data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token -@pytest.mark.django_db -def test_oidc_get_bearer_token_invalid_password(client, mocker): - """ - User with wrong credentials should not be allowed to display a token. - """ - mock_keycloak(mocker) - password = "secret" - _generate_bearer_token(client, password) - - url = reverse("oidc-get-bearer-token") - check_http_post_response( - client, - url, - status_code=401, - data={"password": "invalid-password", "token_id": 1}, - ) - - def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to revoke tokens. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for _ in range(nb_tokens): - _generate_bearer_token(client, password) + _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( - client, url, status_code=200, data={"password": password, "token_ids": [1]}, + client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( - client, url, status_code=200, data={"password": password, "token_ids": [2, 3]}, + client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 -@pytest.mark.django_db -def test_oidc_revoke_bearer_token_invalid_password(client, mocker): - """ - User with wrong credentials should not be allowed to revoke tokens. - """ - mock_keycloak(mocker) - password = "secret" - - _generate_bearer_token(client, password) - - url = reverse("oidc-revoke-bearer-tokens") - - check_http_post_response( - client, - url, - status_code=401, - data={"password": "invalid-password", "token_ids": [1]}, - ) - - def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db def test_oidc_profile_view(client, mocker): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] mock_keycloak(mocker, user_permissions=user_permissions) client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm)