diff --git a/assets/src/bundles/auth/index.js b/assets/src/bundles/auth/index.js index 4156b4b9..652736b1 100644 --- a/assets/src/bundles/auth/index.js +++ b/assets/src/bundles/auth/index.js @@ -1,187 +1,193 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ import {handleFetchError, csrfPost, removeUrlFragment} from 'utils/functions'; import './auth.css'; let apiTokensTable; function tokenForm(infoText, buttonText) { const form = `<form id="swh-token-form" class="text-center"> <p id="swh-token-form-text">${infoText}</p> <input id="swh-token-form-submit" type="submit" value="${buttonText}"> <div id="swh-token-form-message"></div> </form>`; return form; } function errorMessage(message) { return `<p id="swh-token-error-message" class="mt-3 swh-token-form-message">${message}</p>`; } function successMessage(message) { return `<p id="swh-token-success-message" class="mt-3 swh-token-form-message">${message}</p>`; } function disableSubmitButton() { $('#swh-token-form-submit').prop('disabled', true); } function generateToken() { window.location = Urls.oidc_generate_bearer_token(); } function displayToken(tokenId) { const postData = { token_id: tokenId }; csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(response => response.text()) .then(token => { const tokenHtml = `<p>Below is your token.</p> <pre id="swh-bearer-token" class="mt-3">${token}</pre>`; swh.webapp.showModalHtml('Display bearer token', tokenHtml); }) - .catch(() => { - swh.webapp.showModalHtml('Display bearer token', errorMessage('Internal server error.')); + .catch(response => { + response.text().then(responseText => { + let errorMsg = 'Internal server error.'; + if (response.status === 400) { + errorMsg = responseText; + } + swh.webapp.showModalHtml('Display bearer token', errorMessage(errorMsg)); + }); }); } function revokeTokens(tokenIds) { const postData = { token_ids: tokenIds }; csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(() => { disableSubmitButton(); $('#swh-token-form-message').html( successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked.`)); apiTokensTable.draw(); }) .catch(() => { $('#swh-token-form-message').html(errorMessage('Internal server error.')); }); } function revokeToken(tokenId) { revokeTokens([tokenId]); } function revokeAllTokens() { const tokenIds = []; const rowsData = apiTokensTable.rows().data(); for (let i = 0; i < rowsData.length; ++i) { tokenIds.push(rowsData[i].id); } revokeTokens(tokenIds); } export function applyTokenAction(action, tokenId) { const actionData = { display: { submitCallback: displayToken }, generate: { modalTitle: 'Bearer token generation', infoText: 'Click on the button to generate the token. You will be redirected to ' + 'Software Heritage Authentication Service and might be asked to enter ' + 'your password again.', buttonText: 'Generate token', submitCallback: generateToken }, revoke: { modalTitle: 'Revoke bearer token', infoText: 'Click on the button to revoke the token.', buttonText: 'Revoke token', submitCallback: revokeToken }, revokeAll: { modalTitle: 'Revoke all bearer tokens', infoText: 'Click on the button to revoke all tokens.', buttonText: 'Revoke tokens', submitCallback: revokeAllTokens } }; if (!actionData[action]) { return; } if (action !== 'display') { const tokenFormHtml = tokenForm( actionData[action].infoText, actionData[action].buttonText); swh.webapp.showModalHtml(actionData[action].modalTitle, tokenFormHtml); $(`#swh-token-form`).submit(event => { event.preventDefault(); event.stopPropagation(); actionData[action].submitCallback(tokenId); }); } else { actionData[action].submitCallback(tokenId); } } export function initProfilePage() { $(document).ready(() => { apiTokensTable = $('#swh-bearer-tokens-table') .on('error.dt', (e, settings, techNote, message) => { $('#swh-origin-save-request-list-error').text( 'An error occurred while retrieving the tokens list'); console.log(message); }) .DataTable({ serverSide: true, ajax: Urls.oidc_list_bearer_tokens(), columns: [ { data: 'creation_date', name: 'creation_date', render: (data, type, row) => { if (type === 'display') { let date = new Date(data); return date.toLocaleString(); } return data; } }, { render: (data, type, row) => { const html = `<button class="btn btn-default" onclick="swh.auth.applyTokenAction('display', ${row.id})"> Display token </button> <button class="btn btn-default" onclick="swh.auth.applyTokenAction('revoke', ${row.id})"> Revoke token </button>`; return html; } } ], ordering: false, searching: false, scrollY: '50vh', scrollCollapse: true }); $('#swh-oidc-profile-tokens-tab').on('shown.bs.tab', () => { apiTokensTable.draw(); window.location.hash = '#tokens'; }); $('#swh-oidc-profile-account-tab').on('shown.bs.tab', () => { removeUrlFragment(); }); if (window.location.hash === '#tokens') { $('.nav-tabs a[href="#swh-oidc-profile-tokens"]').tab('show'); } }); } diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js index 7e1c3a22..2e858b84 100644 --- a/cypress/integration/api-tokens.spec.js +++ b/cypress/integration/api-tokens.spec.js @@ -1,133 +1,142 @@ /** * Copyright (C) 2020-2021 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ describe('Test API tokens UI', function() { it('should ask for user to login', function() { cy.visit(`${this.Urls.oidc_profile()}#tokens`, {failOnStatusCode: false}); cy.location().should(loc => { expect(loc.pathname).to.eq(this.Urls.oidc_login()); }); }); function initTokensPage(Urls, tokens) { cy.intercept(`${Urls.oidc_list_bearer_tokens()}/**`, { body: { 'recordsTotal': tokens.length, 'draw': 2, 'recordsFiltered': tokens.length, 'data': tokens } }); // the tested UI should not be accessible for standard Django users // but we need a user logged in for testing it cy.adminLogin(); cy.visit(`${Urls.oidc_profile()}#tokens`); } it('should initiate token generation flow', function() { initTokensPage(this.Urls, []); cy.contains('Generate new token') .click(); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Bearer token generation'); cy.get('#swh-token-form-submit') .click(); cy.location().should(loc => { expect(loc.pathname).to.eq(this.Urls.oidc_generate_bearer_token()); }); }); it('should report error when not logged in and visiting a token generation URL', function() { cy.visit(this.Urls.oidc_generate_bearer_token_complete(), {failOnStatusCode: false}); cy.get('.swh-http-error') .should('be.visible'); cy.get('.swh-http-error-code') .should('contain', 403); cy.get('.swh-http-error-desc') .should('contain', 'You are not allowed to generate bearer tokens'); }); - function displayToken(Urls, status, tokenValue = '') { + function displayToken(Urls, status, body = '') { cy.intercept('POST', `${Urls.oidc_get_bearer_token()}/**`, { - body: tokenValue, + body: body, statusCode: status }).as('getTokenRequest'); cy.contains('Display token') .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Display bearer token'); } it('should show a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); const tokenValue = 'token-value'; displayToken(this.Urls, 200, tokenValue); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); - it('should report errors when token display failed', function() { + it('should report error when token display failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); - displayToken(this.Urls, 500); + const errorMessage = 'Internal server error'; + displayToken(this.Urls, 500, errorMessage); cy.get('.modal-body') - .should('contain', 'Internal server error'); + .should('contain', errorMessage); + }); + + it('should report error when token expired', function() { + initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); + const errorMessage = 'Bearer token has expired'; + displayToken(this.Urls, 400, errorMessage); + cy.get('.modal-body') + .should('contain', errorMessage); }); function revokeToken(Urls, status) { cy.intercept('POST', `${Urls.oidc_revoke_bearer_tokens()}/**`, { body: '', statusCode: status }).as('revokeTokenRequest'); cy.contains('Revoke token') .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Revoke bearer token'); cy.get('#swh-token-form-submit') .click({force: true}); cy.wait('@revokeTokenRequest'); if (status === 200) { cy.get('#swh-token-form-submit') .should('be.disabled'); } } it('should revoke a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 200); cy.get('#swh-token-form-message') .should('contain', 'Bearer token successfully revoked'); }); it('should report errors when token revoke failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); }); diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 1dce5c7b..ecd3ff94 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,173 +1,187 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union, cast from cryptography.fernet import InvalidToken from django.conf.urls import url from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, + HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser from swh.auth.django.utils import keycloak_oidc_client from swh.auth.django.views import get_oidc_login_data, oidc_login_view from swh.auth.django.views import urlpatterns as auth_urlpatterns +from swh.auth.keycloak import KeycloakError, keycloak_error_message from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import decrypt_data, encrypt_data from swh.web.common.exc import ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) login_data = get_oidc_login_data(request) oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes: # token has been retrieved from a PosgreSQL database if isinstance(token, memoryview): return token.tobytes() else: return token @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) - return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") + refresh_token = decrypted_token.decode("ascii") + # check token is still valid + oidc_client = keycloak_oidc_client() + oidc_client.refresh_token(refresh_token) + return HttpResponse(refresh_token, content_type="text/plain") except InvalidToken: return HttpResponse(status=401) + except KeycloakError as ke: + error_msg = keycloak_error_message(ke) + if error_msg in ( + "invalid_grant: Offline session not active", + "invalid_grant: Offline user session not found", + ): + error_msg = "Bearer token has expired, please generate a new one." + return HttpResponseBadRequest(error_msg, content_type="text/plain") @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") urlpatterns = auth_urlpatterns + [ url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/generate-bearer-token-complete/$", oidc_generate_bearer_token_complete, name="oidc-generate-bearer-token-complete", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/templates/auth/profile.html b/swh/web/templates/auth/profile.html index 82e023f3..56cc96a2 100644 --- a/swh/web/templates/auth/profile.html +++ b/swh/web/templates/auth/profile.html @@ -1,105 +1,108 @@ {% extends "layout.html" %} {% comment %} Copyright (C) 2020 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load render_bundle from webpack_loader %} {% load swh_templatetags %} {% block title %} User profile – Software Heritage {% endblock %} {% block header %} {% render_bundle 'auth' %} {% endblock %} {% block navbar-content %} <h4>User profile</h4> {% endblock %} {% block content %} <ul class="nav nav-tabs" style="padding-left: 5px;"> <li class="nav-item"> <a class="nav-link active" data-toggle="tab" id="swh-oidc-profile-account-tab" href="#swh-oidc-profile-account">Account</a> </li> <li class="nav-item"> <a class="nav-link" data-toggle="tab" id="swh-oidc-profile-tokens-tab" href="#swh-oidc-profile-tokens">API tokens</a> </li> </ul> <div class="tab-content"> <div id="swh-oidc-profile-account" class="tab-pane active"> <p class="mt-3"> Below are the details of your user account. You can edit your personal information in the <a href="{{ keycloak.server_url }}realms/{{ keycloak.realm_name }}/account/"> Software Heritage Account Management </a> interface. </p> <table class="table swh-table w-100 mt-3 border-top border-bottom"> <tr> <th>Username</th> <td class="border-bottom">{{ user.username }}</td> </tr> <tr> <th>First name</th> <td class="border-bottom">{{ user.first_name }}</td> </tr> <tr> <th>Last name</th> <td class="border-bottom">{{ user.last_name }}</td> </tr> <tr> <th>Email</th> <td class="border-bottom">{{ user.email }}</td> </tr> <tr> <th>Permissions:</th> <td> {% for perm in user.get_all_permissions %} {{ perm }}<br/> {% endfor %} </td> </tr> </table> </div> <div id="swh-oidc-profile-tokens" class="tab-pane"> <p class="mt-3"> That interface enables to manage bearer tokens for Web API authentication. A token has to be sent in HTTP authorization headers to make authenticated API requests. </p> <p> For instance when using <code>curl</code> proceed as follows: <pre>curl -H "Authorization: Bearer ${TOKEN}" {{ site_base_url }}api/...</pre> </p> + <p> + Please not that a bearer token will automatically expire after 30 days of inactivity. + </p> <div class="mt-3"> <div class="float-right"> <button class="btn btn-default" onclick="swh.auth.applyTokenAction('generate')"> Generate new token </button> <button class="btn btn-default float-right" onclick="swh.auth.applyTokenAction('revokeAll')"> Revoke all tokens </button> </div> <table id="swh-bearer-tokens-table" class="table swh-table swh-table-striped" width="100%"> <thead> <tr> <th>Creation date</th> <th>Actions</th> </tr> </thead> </table> </div> </div> </div> <script> swh.auth.initProfilePage(); </script> {% endblock content %} diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 07436e12..251975a6 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,272 +1,306 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.http import QueryDict +from swh.auth.keycloak import KeycloakError from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view def _check_oidc_login_code_flow_data( request, response, keycloak_oidc, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_http_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ _generate_and_test_bearer_token(client, keycloak_oidc) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): token = _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token +@pytest.mark.django_db +def test_oidc_get_bearer_token_expired_token(client, keycloak_oidc): + """ + User with correct credentials should be allowed to display a token. + """ + + _generate_and_test_bearer_token(client, keycloak_oidc) + + for kc_err_msg in ("Offline session not active", "Offline user session not found"): + + kc_error_dict = { + "error": "invalid_grant", + "error_description": kc_err_msg, + } + + keycloak_oidc.refresh_token.side_effect = KeycloakError( + error_message=json.dumps(kc_error_dict).encode(), response_code=400 + ) + + url = reverse("oidc-get-bearer-token") + + response = check_http_post_response( + client, + url, + status_code=400, + data={"token_id": 1}, + content_type="text/plain", + ) + assert ( + response.content == b"Bearer token has expired, please generate a new one." + ) + + def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_http_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db def test_oidc_profile_view(client, keycloak_oidc): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] keycloak_oidc.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm)