diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js --- a/cypress/integration/api-tokens.spec.js +++ b/cypress/integration/api-tokens.spec.js @@ -32,13 +32,8 @@ cy.visit(`${Urls.oidc_profile()}#tokens`); } - function generateToken(Urls, status, tokenValue = '') { - cy.route({ - method: 'POST', - url: `${Urls.oidc_generate_bearer_token()}/**`, - response: tokenValue, - status: status - }).as('generateTokenRequest'); + it('should initiate token generation flow', function() { + initTokensPage(this.Urls, []); cy.contains('Generate new token') .click(); @@ -49,50 +44,22 @@ cy.get('.modal-header') .should('contain', 'Bearer token generation'); - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret'); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .click(); - cy.wait('@generateTokenRequest'); - - if (status === 200) { - cy.get('#swh-user-password-submit') - .should('be.disabled'); - } - } - - it('should generate and display bearer token', function() { - initTokensPage(this.Urls, []); - const tokenValue = 'bearer-token-value'; - generateToken(this.Urls, 200, tokenValue); - cy.get('#swh-token-success-message') - .should('contain', 'Below is your token'); - cy.get('#swh-bearer-token') - .should('contain', tokenValue); + cy.location().should(loc => { + expect(loc.pathname).to.eq(this.Urls.oidc_generate_bearer_token()); + }); }); - it('should report errors when token generation failed', function() { - initTokensPage(this.Urls, []); - generateToken(this.Urls, 400); - cy.get('#swh-token-error-message') + it('should report error when not logged in and visiting a token generation URL', function() { + cy.visit(this.Urls.oidc_generate_bearer_token_complete(), {failOnStatusCode: false}); + cy.get('.swh-http-error') + .should('be.visible'); + cy.get('.swh-http-error-code') + .should('contain', 403); + cy.get('.swh-http-error-desc') .should('contain', 'You are not allowed to generate bearer tokens'); - cy.get('#swh-web-modal-html .close').click(); - generateToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); - generateToken(this.Urls, 500); - cy.get('#swh-token-error-message') - .should('contain', 'Internal server error'); - }); function displayToken(Urls, status, tokenValue = '') { @@ -111,45 +78,20 @@ cy.get('.modal-header') .should('contain', 'Display bearer token'); - - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret', {force: true}); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') - .click({force: true}); - - cy.wait('@getTokenRequest'); - - if (status === 200) { - cy.get('#swh-user-password-submit') - .should('be.disabled'); - } } it('should show a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); const tokenValue = 'token-value'; displayToken(this.Urls, 200, tokenValue); - cy.get('#swh-token-success-message') - .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token display failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); - displayToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); displayToken(this.Urls, 500); - cy.get('#swh-token-error-message') + cy.get('.modal-body') .should('contain', 'Internal server error'); }); @@ -170,22 +112,13 @@ cy.get('.modal-header') .should('contain', 'Revoke bearer token'); - cy.get('#swh-user-password-submit') - .should('be.disabled'); - - cy.get('#swh-user-password') - .type('secret', {force: true}); - - cy.get('#swh-user-password-submit') - .should('be.enabled'); - - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .click({force: true}); cy.wait('@revokeTokenRequest'); if (status === 200) { - cy.get('#swh-user-password-submit') + cy.get('#swh-token-form-submit') .should('be.disabled'); } } @@ -193,16 +126,12 @@ it('should revoke a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 200); - cy.get('#swh-token-success-message') + cy.get('#swh-token-form-message') .should('contain', 'Bearer token successfully revoked'); }); it('should report errors when token revoke failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); - revokeToken(this.Urls, 401); - cy.get('#swh-token-error-message') - .should('contain', 'The password is invalid'); - cy.get('#swh-web-modal-html .close').click(); revokeToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js --- a/swh/web/assets/src/bundles/auth/index.js +++ b/swh/web/assets/src/bundles/auth/index.js @@ -10,104 +10,64 @@ let apiTokensTable; -function updateSubmitButtonState() { - const val = $('#swh-user-password').val(); - $('#swh-user-password-submit').prop('disabled', val.length === 0); -} - -function passwordForm(infoText, buttonText) { +function tokenForm(infoText, buttonText) { const form = - `
-

${infoText}

- - - + ` +

${infoText}

+ +
`; return form; } function errorMessage(message) { - return `

${message}

`; + return `

${message}

`; } function successMessage(message) { - return `

${message}

`; + return `

${message}

`; } function disableSubmitButton() { - $('#swh-user-password-submit').prop('disabled', true); - $('#swh-user-password').off('change'); - $('#swh-user-password').off('keyup'); + $('#swh-token-form-submit').prop('disabled', true); } function generateToken() { - csrfPost(Urls.oidc_generate_bearer_token(), {}, - JSON.stringify({password: $('#swh-user-password').val()})) - .then(handleFetchError) - .then(response => response.text()) - .then(token => { - disableSubmitButton(); - const tokenHtml = - `${successMessage('Below is your token.')} -
${token}
`; - $(`#swh-password-form`).append(tokenHtml); - apiTokensTable.draw(); - }) - .catch(response => { - if (response.status === 400) { - $(`#swh-password-form`).append( - errorMessage('You are not allowed to generate bearer tokens.')); - } else if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } - }); + window.location = Urls.oidc_generate_bearer_token(); } function displayToken(tokenId) { const postData = { - password: $('#swh-user-password').val(), token_id: tokenId }; csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(response => response.text()) .then(token => { - disableSubmitButton(); const tokenHtml = - `${successMessage('Below is your token.')} -
${token}
`; - $(`#swh-password-form`).append(tokenHtml); + `

Below is your token.

+
${token}
`; + swh.webapp.showModalHtml('Display bearer token', tokenHtml); }) - .catch(response => { - if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } + .catch(() => { + swh.webapp.showModalHtml('Display bearer token', errorMessage('Internal server error.')); }); } function revokeTokens(tokenIds) { const postData = { - password: $('#swh-user-password').val(), token_ids: tokenIds }; csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(() => { disableSubmitButton(); - $(`#swh-password-form`).append( - successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`)); + $('#swh-token-form-message').html( + successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked.`)); apiTokensTable.draw(); }) - .catch(response => { - if (response.status === 401) { - $(`#swh-password-form`).append(errorMessage('The password is invalid.')); - } else { - $(`#swh-password-form`).append(errorMessage('Internal server error.')); - } + .catch(() => { + $('#swh-token-form-message').html(errorMessage('Internal server error.')); }); } @@ -126,27 +86,26 @@ export function applyTokenAction(action, tokenId) { const actionData = { + display: { + submitCallback: displayToken + }, generate: { modalTitle: 'Bearer token generation', - infoText: 'Enter your password and click on the button to generate the token.', + infoText: 'Click on the button to generate the token. You will be redirected to ' + + 'Software Heritage Authentication Service and might be asked to enter ' + + 'your password again.', buttonText: 'Generate token', submitCallback: generateToken }, - display: { - modalTitle: 'Display bearer token', - infoText: 'Enter your password and click on the button to display the token.', - buttonText: 'Display token', - submitCallback: displayToken - }, revoke: { modalTitle: 'Revoke bearer token', - infoText: 'Enter your password and click on the button to revoke the token.', + infoText: 'Click on the button to revoke the token.', buttonText: 'Revoke token', submitCallback: revokeToken }, revokeAll: { modalTitle: 'Revoke all bearer tokens', - infoText: 'Enter your password and click on the button to revoke all tokens.', + infoText: 'Click on the button to revoke all tokens.', buttonText: 'Revoke tokens', submitCallback: revokeAllTokens } @@ -156,17 +115,19 @@ return; } - const passwordFormHtml = passwordForm( - actionData[action].infoText, actionData[action].buttonText); + if (action !== 'display') { + const tokenFormHtml = tokenForm( + actionData[action].infoText, actionData[action].buttonText); - swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml); - $('#swh-user-password').change(updateSubmitButtonState); - $('#swh-user-password').keyup(updateSubmitButtonState); - $(`#swh-password-form`).submit(event => { - event.preventDefault(); - event.stopPropagation(); + swh.webapp.showModalHtml(actionData[action].modalTitle, tokenFormHtml); + $(`#swh-token-form`).submit(event => { + event.preventDefault(); + event.stopPropagation(); + actionData[action].submitCallback(tokenId); + }); + } else { actionData[action].submitCallback(tokenId); - }); + } } export function initProfilePage() { diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py --- a/swh/web/auth/keycloak.py +++ b/swh/web/auth/keycloak.py @@ -83,27 +83,6 @@ **extra_params, ) - def offline_token(self, username: str, password: str) -> str: - """ - Generate an OpenID Connect offline refresh token. - - Offline tokens are a special type of refresh tokens with long-lived period. - It enables to open a new authenticated session without having to login again. - - Args: - username: username in the Keycloak realm - password: password associated to the username - - Returns: - An offline refresh token - """ - return self._keycloak.token( - grant_type="password", - scope="openid offline_access", - username=username, - password=password, - )["refresh_token"] - def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. diff --git a/swh/web/auth/migrations/0002_remove_stored_tokens.py b/swh/web/auth/migrations/0002_remove_stored_tokens.py new file mode 100644 --- /dev/null +++ b/swh/web/auth/migrations/0002_remove_stored_tokens.py @@ -0,0 +1,22 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from django.db import migrations + +from swh.web.auth.models import OIDCUserOfflineTokens + + +def _remove_stored_encrypted_tokens(apps, schema_editor): + OIDCUserOfflineTokens.objects.all().delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("swh.web.auth", "0001_initial"), + ] + + operations = [migrations.RunPython(_remove_stored_encrypted_tokens)] diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -8,8 +8,6 @@ import uuid from cryptography.fernet import InvalidToken -from keycloak.exceptions import KeycloakError -import sentry_sdk from django.conf.urls import url from django.contrib.auth import authenticate, login, logout @@ -21,7 +19,6 @@ HttpResponse, HttpResponseForbidden, HttpResponseRedirect, - HttpResponseServerError, JsonResponse, ) from django.shortcuts import render @@ -34,17 +31,14 @@ gen_oidc_pkce_codes, get_oidc_client, ) -from swh.web.common.exc import BadInputExc +from swh.web.common.exc import BadInputExc, ForbiddenExc from swh.web.common.utils import reverse +from swh.web.config import get_config -def oidc_login(request: HttpRequest) -> HttpResponse: - """ - Django view to initiate login process using OpenID Connect. - """ +def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): # generate a CSRF token state = str(uuid.uuid4()) - redirect_uri = reverse("oidc-login-complete", request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() @@ -60,7 +54,7 @@ "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", - "scope": "openid", + "scope": scope, "prompt": request.GET.get("prompt", ""), } @@ -72,26 +66,24 @@ return HttpResponseRedirect(authorization_url) -def oidc_login_complete(request: HttpRequest) -> HttpResponse: +def oidc_login(request: HttpRequest) -> HttpResponse: """ - Django view to finalize login process using OpenID Connect. + Django view to initiate login process using OpenID Connect. """ + + redirect_uri = reverse("oidc-login-complete", request=request) + + return _oidc_login(request, redirect_uri=redirect_uri) + + +def _get_login_data(request: HttpRequest) -> Dict[str, Any]: if "login_data" not in request.session: raise Exception("Login process has not been initialized.") - login_data = request.session["login_data"] - next_path = login_data["next_path"] or request.build_absolute_uri("/") + return request.session["login_data"] - if "error" in request.GET: - if login_data["prompt"] == "none": - # Silent login failed because OIDC session expired. - # Redirect to logout page and inform user. - logout(request) - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) - return HttpResponseServerError(request.GET["error"]) + +def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") @@ -102,6 +94,26 @@ if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") + +def oidc_login_complete(request: HttpRequest) -> HttpResponse: + """ + Django view to finalize login process using OpenID Connect. + """ + login_data = _get_login_data(request) + next_path = login_data["next_path"] or request.build_absolute_uri("/") + if "error" in request.GET and login_data["prompt"] == "none": + # Silent login failed because OIDC session expired. + # Redirect to logout page and inform user. + logout(request) + logout_url = reverse( + "logout", query_params={"next_path": next_path, "remote_user": 1} + ) + return HttpResponseRedirect(logout_url) + elif "error" in request.GET: + raise Exception(request.GET["error"]) + + _check_login_data(request, login_data) + user = authenticate( request=request, code=request.GET["code"], @@ -136,25 +148,38 @@ return HttpResponseRedirect(request.build_absolute_uri(logout_url)) -@require_http_methods(["POST"]) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() - try: - data = json.loads(request.body.decode("utf-8")) - user = cast(OIDCUser, request.user) - oidc_client = get_oidc_client() - token = oidc_client.offline_token(user.username, data["password"]) - password = data["password"].encode() - salt = user.sub.encode() - encrypted_token = encrypt_data(token.encode(), password, salt) - OIDCUserOfflineTokens.objects.create( - user_id=str(user.id), offline_token=encrypted_token - ).save() - return HttpResponse(token, content_type="text/plain") - except KeycloakError as e: - sentry_sdk.capture_exception(e) - return HttpResponse(status=e.response_code or 500) + redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) + return _oidc_login( + request, redirect_uri=redirect_uri, scope="openid offline_access" + ) + + +def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: + if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): + raise ForbiddenExc("You are not allowed to generate bearer tokens.") + if "error" in request.GET: + raise Exception(request.GET["error"]) + + oidc_client = get_oidc_client() + login_data = _get_login_data(request) + _check_login_data(request, login_data) + oidc_profile = oidc_client.authorization_code( + code=request.GET["code"], + code_verifier=login_data["code_verifier"], + redirect_uri=login_data["redirect_uri"], + ) + user = cast(OIDCUser, request.user) + token = oidc_profile["refresh_token"] + secret = get_config()["secret_key"].encode() + salt = user.sub.encode() + encrypted_token = encrypt_data(token.encode(), secret, salt) + OIDCUserOfflineTokens.objects.create( + user_id=str(user.id), offline_token=encrypted_token + ).save() + return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: @@ -187,12 +212,12 @@ if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: - data = json.loads(request.body.decode("utf-8")) + data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) - password = data["password"].encode() + secret = get_config()["secret_key"].encode() salt = user.sub.encode() - decrypted_token = decrypt_data(token_data.offline_token, password, salt) + decrypted_token = decrypt_data(token_data.offline_token, secret, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @@ -203,13 +228,13 @@ if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: - data = json.loads(request.body.decode("utf-8")) + data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) - password = data["password"].encode() + secret = get_config()["secret_key"].encode() salt = user.sub.encode() - decrypted_token = decrypt_data(token_data.offline_token, password, salt) + decrypted_token = decrypt_data(token_data.offline_token, secret, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() @@ -232,6 +257,11 @@ oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), + url( + r"^oidc/generate-bearer-token-complete/$", + oidc_generate_bearer_token_complete, + name="oidc-generate-bearer-token-complete", + ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py --- a/swh/web/settings/tests.py +++ b/swh/web/settings/tests.py @@ -75,7 +75,7 @@ }, }, "keycloak": { - "server_url": "http://localhost:8080/auth", + "server_url": "http://localhost:8080/auth/", "realm_name": "SoftwareHeritage", }, } diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py --- a/swh/web/tests/auth/keycloak_mock.py +++ b/swh/web/tests/auth/keycloak_mock.py @@ -67,12 +67,10 @@ self.refresh_token = Mock() self.userinfo = Mock() self.logout = Mock() - self.offline_token = Mock() if auth_success: self.authorization_code.return_value = copy(oidc_profile) self.refresh_token.return_value = copy(oidc_profile) self.userinfo.return_value = copy(userinfo) - self.offline_token.return_value = oidc_profile["refresh_token"] else: self.authorization_url = Mock() exception = KeycloakError( @@ -83,7 +81,6 @@ self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception - self.offline_token = exception def decode_token(self, token): options = {} diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -7,14 +7,13 @@ from urllib.parse import urljoin, urlparse import uuid -from keycloak.exceptions import KeycloakError import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens -from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains @@ -29,25 +28,9 @@ from .keycloak_mock import mock_keycloak -@pytest.mark.django_db -def test_oidc_login_views_success(client, mocker): - """ - Simulate a successful login authentication with OpenID Connect - authorization code flow with PKCE. - """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) - - # user initiates login process - login_url = reverse("oidc-login") - - # should redirect to Keycloak authentication page in order - # for a user to login with its username / password - response = check_html_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - +def _check_oidc_login_code_flow_data( + request, response, kc_oidc_mock, redirect_uri, scope="openid" +): parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] @@ -60,11 +43,11 @@ assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict - assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request) + assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict - assert query_dict["scope"] == "openid" + assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict @@ -75,6 +58,34 @@ assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] + return login_data + + +@pytest.mark.django_db +def test_oidc_login_views_success(client, mocker): + """ + Simulate a successful login authentication with OpenID Connect + authorization code flow with PKCE. + """ + # mock Keycloak client + kc_oidc_mock = mock_keycloak(mocker) + + # user initiates login process + login_url = reverse("oidc-login") + + # should redirect to Keycloak authentication page in order + # for a user to login with its username / password + response = check_html_get_response(client, login_url, status_code=302) + request = response.wsgi_request + + assert isinstance(request.user, AnonymousUser) + + login_data = _check_oidc_login_code_flow_data( + request, + response, + kc_oidc_mock, + redirect_uri=reverse("oidc-login-complete", request=request), + ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to @@ -346,46 +357,73 @@ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") - check_http_post_response(client, url, data={"password": "secret"}, status_code=403) + check_http_get_response(client, url, status_code=403) -def _generate_bearer_token(client, password): +def _generate_and_test_bearer_token(client, kc_oidc_mock): + # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) + # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") - return client.post( - url, data={"password": password}, content_type="application/json" + response = check_http_get_response(client, url, status_code=302) + request = response.wsgi_request + redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) + # check login data and redirection to Keycloak is valid + login_data = _check_oidc_login_code_flow_data( + request, + response, + kc_oidc_mock, + redirect_uri=redirect_uri, + scope="openid offline_access", ) + # once a user has identified himself in Keycloak, he is + # redirected to the 'oidc-generate-bearer-token-complete' view + # to get and save bearer token -@pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): - """ - User with correct credentials should be allowed to generate a token. - """ - kc_mock = mock_keycloak(mocker) - password = "secret" - response = _generate_bearer_token(client, password) - user = response.wsgi_request.user - assert response.status_code == 200 - assert response.content.decode("ascii") == kc_mock.offline_token( - username=user.username, password=password + # generate authorization code / session state in the same + # manner as Keycloak + code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" + session_state = str(uuid.uuid4()) + + token_complete_url = reverse( + "oidc-generate-bearer-token-complete", + query_params={ + "code": code, + "state": login_data["state"], + "session_state": session_state, + }, ) + nb_tokens = len(OIDCUserOfflineTokens.objects.all()) + response = check_html_get_response(client, token_complete_url, status_code=302) + request = response.wsgi_request + + # check token has been generated and saved encrypted to database + assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 + encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token + secret = get_config()["secret_key"].encode() + salt = request.user.sub.encode() + decrypted_token = decrypt_data(encrypted_token, secret, salt) + oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) + assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] + + # should redirect to tokens management Web UI + assert response["location"] == reverse("oidc-profile") + "#tokens" + + return decrypted_token + @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker): +def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): """ - User with wrong credentials should not be allowed to generate a token. + Authenticated user should be able to generate a bearer token using OIDC + Authorization Code Flow. """ - response_code = 401 - kc_mock = mock_keycloak(mocker) - kc_mock.offline_token.side_effect = KeycloakError( - error_message="Invalid password", response_code=response_code - ) - response = _generate_bearer_token(client, password="invalid-password") - assert response.status_code == response_code + kc_oidc_mock = mock_keycloak(mocker) + _generate_and_test_bearer_token(client, kc_oidc_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): @@ -403,12 +441,11 @@ """ User with correct credentials should be allowed to list his tokens. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for _ in range(nb_tokens): - response = _generate_bearer_token(client, password) + _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} @@ -437,13 +474,11 @@ """ User with correct credentials should be allowed to display a token. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for i in range(nb_tokens): - response = _generate_bearer_token(client, password) - token = response.content + token = _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-get-bearer-token") @@ -451,30 +486,12 @@ client, url, status_code=200, - data={"password": password, "token_id": i + 1}, + data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token -@pytest.mark.django_db -def test_oidc_get_bearer_token_invalid_password(client, mocker): - """ - User with wrong credentials should not be allowed to display a token. - """ - mock_keycloak(mocker) - password = "secret" - _generate_bearer_token(client, password) - - url = reverse("oidc-get-bearer-token") - check_http_post_response( - client, - url, - status_code=401, - data={"password": "invalid-password", "token_id": 1}, - ) - - def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. @@ -488,46 +505,25 @@ """ User with correct credentials should be allowed to revoke tokens. """ - mock_keycloak(mocker) + kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 - password = "secret" for _ in range(nb_tokens): - _generate_bearer_token(client, password) + _generate_and_test_bearer_token(client, kc_oidc_mock) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( - client, url, status_code=200, data={"password": password, "token_ids": [1]}, + client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( - client, url, status_code=200, data={"password": password, "token_ids": [2, 3]}, + client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 -@pytest.mark.django_db -def test_oidc_revoke_bearer_token_invalid_password(client, mocker): - """ - User with wrong credentials should not be allowed to revoke tokens. - """ - mock_keycloak(mocker) - password = "secret" - - _generate_bearer_token(client, password) - - url = reverse("oidc-revoke-bearer-tokens") - - check_http_post_response( - client, - url, - status_code=401, - data={"password": "invalid-password", "token_ids": [1]}, - ) - - def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when