Page MenuHomeSoftware Heritage

D4692.id16707.diff
No OneTemporary

D4692.id16707.diff

diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js
--- a/cypress/integration/api-tokens.spec.js
+++ b/cypress/integration/api-tokens.spec.js
@@ -32,13 +32,8 @@
cy.visit(`${Urls.oidc_profile()}#tokens`);
}
- function generateToken(Urls, status, tokenValue = '') {
- cy.route({
- method: 'POST',
- url: `${Urls.oidc_generate_bearer_token()}/**`,
- response: tokenValue,
- status: status
- }).as('generateTokenRequest');
+ it('should initiate token generation flow', function() {
+ initTokensPage(this.Urls, []);
cy.contains('Generate new token')
.click();
@@ -49,50 +44,22 @@
cy.get('.modal-header')
.should('contain', 'Bearer token generation');
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret');
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.click();
- cy.wait('@generateTokenRequest');
-
- if (status === 200) {
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
- }
- }
-
- it('should generate and display bearer token', function() {
- initTokensPage(this.Urls, []);
- const tokenValue = 'bearer-token-value';
- generateToken(this.Urls, 200, tokenValue);
- cy.get('#swh-token-success-message')
- .should('contain', 'Below is your token');
- cy.get('#swh-bearer-token')
- .should('contain', tokenValue);
+ cy.location().should(loc => {
+ expect(loc.pathname).to.eq(this.Urls.oidc_generate_bearer_token());
+ });
});
- it('should report errors when token generation failed', function() {
- initTokensPage(this.Urls, []);
- generateToken(this.Urls, 400);
- cy.get('#swh-token-error-message')
+ it('should report error when not logged in and visiting a token generation URL', function() {
+ cy.visit(this.Urls.oidc_generate_bearer_token_complete(), {failOnStatusCode: false});
+ cy.get('.swh-http-error')
+ .should('be.visible');
+ cy.get('.swh-http-error-code')
+ .should('contain', 403);
+ cy.get('.swh-http-error-desc')
.should('contain', 'You are not allowed to generate bearer tokens');
- cy.get('#swh-web-modal-html .close').click();
- generateToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
- generateToken(this.Urls, 500);
- cy.get('#swh-token-error-message')
- .should('contain', 'Internal server error');
-
});
function displayToken(Urls, status, tokenValue = '') {
@@ -111,45 +78,20 @@
cy.get('.modal-header')
.should('contain', 'Display bearer token');
-
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret', {force: true});
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
- .click({force: true});
-
- cy.wait('@getTokenRequest');
-
- if (status === 200) {
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
- }
}
it('should show a token when requested', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
const tokenValue = 'token-value';
displayToken(this.Urls, 200, tokenValue);
- cy.get('#swh-token-success-message')
- .should('contain', 'Below is your token');
cy.get('#swh-bearer-token')
.should('contain', tokenValue);
});
it('should report errors when token display failed', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
- displayToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
displayToken(this.Urls, 500);
- cy.get('#swh-token-error-message')
+ cy.get('.modal-body')
.should('contain', 'Internal server error');
});
@@ -170,22 +112,13 @@
cy.get('.modal-header')
.should('contain', 'Revoke bearer token');
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret', {force: true});
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.click({force: true});
cy.wait('@revokeTokenRequest');
if (status === 200) {
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.should('be.disabled');
}
}
@@ -193,16 +126,12 @@
it('should revoke a token when requested', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
revokeToken(this.Urls, 200);
- cy.get('#swh-token-success-message')
+ cy.get('#swh-token-form-message')
.should('contain', 'Bearer token successfully revoked');
});
it('should report errors when token revoke failed', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
- revokeToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
revokeToken(this.Urls, 500);
cy.get('#swh-token-error-message')
.should('contain', 'Internal server error');
diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js
--- a/swh/web/assets/src/bundles/auth/index.js
+++ b/swh/web/assets/src/bundles/auth/index.js
@@ -10,104 +10,64 @@
let apiTokensTable;
-function updateSubmitButtonState() {
- const val = $('#swh-user-password').val();
- $('#swh-user-password-submit').prop('disabled', val.length === 0);
-}
-
-function passwordForm(infoText, buttonText) {
+function tokenForm(infoText, buttonText) {
const form =
- `<form id="swh-password-form">
- <p id="swh-password-form-text">${infoText}</p>
- <label for="swh-user-password">Password:&nbsp;</label>
- <input id="swh-user-password" type="password" name="swh-user-password" required>
- <input id="swh-user-password-submit" type="submit" value="${buttonText}" disabled>
+ `<form id="swh-token-form" class="text-center">
+ <p id="swh-token-form-text">${infoText}</p>
+ <input id="swh-token-form-submit" type="submit" value="${buttonText}">
+ <div id="swh-token-form-message"></div>
</form>`;
return form;
}
function errorMessage(message) {
- return `<p id="swh-token-error-message" class="mt-3">${message}</p>`;
+ return `<p id="swh-token-error-message" class="mt-3 swh-token-form-message">${message}</p>`;
}
function successMessage(message) {
- return `<p id="swh-token-success-message" class="mt-3">${message}</p>`;
+ return `<p id="swh-token-success-message" class="mt-3 swh-token-form-message">${message}</p>`;
}
function disableSubmitButton() {
- $('#swh-user-password-submit').prop('disabled', true);
- $('#swh-user-password').off('change');
- $('#swh-user-password').off('keyup');
+ $('#swh-token-form-submit').prop('disabled', true);
}
function generateToken() {
- csrfPost(Urls.oidc_generate_bearer_token(), {},
- JSON.stringify({password: $('#swh-user-password').val()}))
- .then(handleFetchError)
- .then(response => response.text())
- .then(token => {
- disableSubmitButton();
- const tokenHtml =
- `${successMessage('Below is your token.')}
- <pre id="swh-bearer-token">${token}</pre>`;
- $(`#swh-password-form`).append(tokenHtml);
- apiTokensTable.draw();
- })
- .catch(response => {
- if (response.status === 400) {
- $(`#swh-password-form`).append(
- errorMessage('You are not allowed to generate bearer tokens.'));
- } else if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
- });
+ window.location = Urls.oidc_generate_bearer_token();
}
function displayToken(tokenId) {
const postData = {
- password: $('#swh-user-password').val(),
token_id: tokenId
};
csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData))
.then(handleFetchError)
.then(response => response.text())
.then(token => {
- disableSubmitButton();
const tokenHtml =
- `${successMessage('Below is your token.')}
- <pre id="swh-bearer-token">${token}</pre>`;
- $(`#swh-password-form`).append(tokenHtml);
+ `<p>Below is your token.</p>
+ <pre id="swh-bearer-token" class="mt-3">${token}</pre>`;
+ swh.webapp.showModalHtml('Display bearer token', tokenHtml);
})
- .catch(response => {
- if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
+ .catch(() => {
+ swh.webapp.showModalHtml('Display bearer token', errorMessage('Internal server error.'));
});
}
function revokeTokens(tokenIds) {
const postData = {
- password: $('#swh-user-password').val(),
token_ids: tokenIds
};
csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData))
.then(handleFetchError)
.then(() => {
disableSubmitButton();
- $(`#swh-password-form`).append(
- successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`));
+ $('#swh-token-form-message').html(
+ successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked.`));
apiTokensTable.draw();
})
- .catch(response => {
- if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
+ .catch(() => {
+ $('#swh-token-form-message').html(errorMessage('Internal server error.'));
});
}
@@ -126,27 +86,26 @@
export function applyTokenAction(action, tokenId) {
const actionData = {
+ display: {
+ submitCallback: displayToken
+ },
generate: {
modalTitle: 'Bearer token generation',
- infoText: 'Enter your password and click on the button to generate the token.',
+ infoText: 'Click on the button to generate the token. You will be redirected to ' +
+ 'Software Heritage Authentication Service and might be asked to enter ' +
+ 'your password again.',
buttonText: 'Generate token',
submitCallback: generateToken
},
- display: {
- modalTitle: 'Display bearer token',
- infoText: 'Enter your password and click on the button to display the token.',
- buttonText: 'Display token',
- submitCallback: displayToken
- },
revoke: {
modalTitle: 'Revoke bearer token',
- infoText: 'Enter your password and click on the button to revoke the token.',
+ infoText: 'Click on the button to revoke the token.',
buttonText: 'Revoke token',
submitCallback: revokeToken
},
revokeAll: {
modalTitle: 'Revoke all bearer tokens',
- infoText: 'Enter your password and click on the button to revoke all tokens.',
+ infoText: 'Click on the button to revoke all tokens.',
buttonText: 'Revoke tokens',
submitCallback: revokeAllTokens
}
@@ -156,17 +115,19 @@
return;
}
- const passwordFormHtml = passwordForm(
- actionData[action].infoText, actionData[action].buttonText);
+ if (action !== 'display') {
+ const tokenFormHtml = tokenForm(
+ actionData[action].infoText, actionData[action].buttonText);
- swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml);
- $('#swh-user-password').change(updateSubmitButtonState);
- $('#swh-user-password').keyup(updateSubmitButtonState);
- $(`#swh-password-form`).submit(event => {
- event.preventDefault();
- event.stopPropagation();
+ swh.webapp.showModalHtml(actionData[action].modalTitle, tokenFormHtml);
+ $(`#swh-token-form`).submit(event => {
+ event.preventDefault();
+ event.stopPropagation();
+ actionData[action].submitCallback(tokenId);
+ });
+ } else {
actionData[action].submitCallback(tokenId);
- });
+ }
}
export function initProfilePage() {
diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py
--- a/swh/web/auth/keycloak.py
+++ b/swh/web/auth/keycloak.py
@@ -83,27 +83,6 @@
**extra_params,
)
- def offline_token(self, username: str, password: str) -> str:
- """
- Generate an OpenID Connect offline refresh token.
-
- Offline tokens are a special type of refresh tokens with long-lived period.
- It enables to open a new authenticated session without having to login again.
-
- Args:
- username: username in the Keycloak realm
- password: password associated to the username
-
- Returns:
- An offline refresh token
- """
- return self._keycloak.token(
- grant_type="password",
- scope="openid offline_access",
- username=username,
- password=password,
- )["refresh_token"]
-
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Request a new access token from Keycloak using a refresh token.
diff --git a/swh/web/auth/migrations/0002_remove_stored_tokens.py b/swh/web/auth/migrations/0002_remove_stored_tokens.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/migrations/0002_remove_stored_tokens.py
@@ -0,0 +1,22 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from django.db import migrations
+
+from swh.web.auth.models import OIDCUserOfflineTokens
+
+
+def _remove_stored_encrypted_tokens(apps, schema_editor):
+ OIDCUserOfflineTokens.objects.all().delete()
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("swh.web.auth", "0001_initial"),
+ ]
+
+ operations = [migrations.RunPython(_remove_stored_encrypted_tokens)]
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -8,8 +8,6 @@
import uuid
from cryptography.fernet import InvalidToken
-from keycloak.exceptions import KeycloakError
-import sentry_sdk
from django.conf.urls import url
from django.contrib.auth import authenticate, login, logout
@@ -21,7 +19,6 @@
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
- HttpResponseServerError,
JsonResponse,
)
from django.shortcuts import render
@@ -34,17 +31,14 @@
gen_oidc_pkce_codes,
get_oidc_client,
)
-from swh.web.common.exc import BadInputExc
+from swh.web.common.exc import BadInputExc, ForbiddenExc
from swh.web.common.utils import reverse
+from swh.web.config import get_config
-def oidc_login(request: HttpRequest) -> HttpResponse:
- """
- Django view to initiate login process using OpenID Connect.
- """
+def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"):
# generate a CSRF token
state = str(uuid.uuid4())
- redirect_uri = reverse("oidc-login-complete", request=request)
code_verifier, code_challenge = gen_oidc_pkce_codes()
@@ -60,7 +54,7 @@
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
- "scope": "openid",
+ "scope": scope,
"prompt": request.GET.get("prompt", ""),
}
@@ -72,26 +66,24 @@
return HttpResponseRedirect(authorization_url)
-def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+def oidc_login(request: HttpRequest) -> HttpResponse:
"""
- Django view to finalize login process using OpenID Connect.
+ Django view to initiate login process using OpenID Connect.
"""
+
+ redirect_uri = reverse("oidc-login-complete", request=request)
+
+ return _oidc_login(request, redirect_uri=redirect_uri)
+
+
+def _get_login_data(request: HttpRequest) -> Dict[str, Any]:
if "login_data" not in request.session:
raise Exception("Login process has not been initialized.")
- login_data = request.session["login_data"]
- next_path = login_data["next_path"] or request.build_absolute_uri("/")
+ return request.session["login_data"]
- if "error" in request.GET:
- if login_data["prompt"] == "none":
- # Silent login failed because OIDC session expired.
- # Redirect to logout page and inform user.
- logout(request)
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
- return HttpResponseServerError(request.GET["error"])
+
+def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]):
if "code" not in request.GET or "state" not in request.GET:
raise BadInputExc("Missing query parameters for authentication.")
@@ -102,6 +94,26 @@
if state != login_data["state"]:
raise BadInputExc("Wrong CSRF token, aborting login process.")
+
+def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to finalize login process using OpenID Connect.
+ """
+ login_data = _get_login_data(request)
+ next_path = login_data["next_path"] or request.build_absolute_uri("/")
+ if "error" in request.GET and login_data["prompt"] == "none":
+ # Silent login failed because OIDC session expired.
+ # Redirect to logout page and inform user.
+ logout(request)
+ logout_url = reverse(
+ "logout", query_params={"next_path": next_path, "remote_user": 1}
+ )
+ return HttpResponseRedirect(logout_url)
+ elif "error" in request.GET:
+ raise Exception(request.GET["error"])
+
+ _check_login_data(request, login_data)
+
user = authenticate(
request=request,
code=request.GET["code"],
@@ -136,25 +148,38 @@
return HttpResponseRedirect(request.build_absolute_uri(logout_url))
-@require_http_methods(["POST"])
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
- try:
- data = json.loads(request.body.decode("utf-8"))
- user = cast(OIDCUser, request.user)
- oidc_client = get_oidc_client()
- token = oidc_client.offline_token(user.username, data["password"])
- password = data["password"].encode()
- salt = user.sub.encode()
- encrypted_token = encrypt_data(token.encode(), password, salt)
- OIDCUserOfflineTokens.objects.create(
- user_id=str(user.id), offline_token=encrypted_token
- ).save()
- return HttpResponse(token, content_type="text/plain")
- except KeycloakError as e:
- sentry_sdk.capture_exception(e)
- return HttpResponse(status=e.response_code or 500)
+ redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
+ return _oidc_login(
+ request, redirect_uri=redirect_uri, scope="openid offline_access"
+ )
+
+
+def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse:
+ if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
+ raise ForbiddenExc("You are not allowed to generate bearer tokens.")
+ if "error" in request.GET:
+ raise Exception(request.GET["error"])
+
+ oidc_client = get_oidc_client()
+ login_data = _get_login_data(request)
+ _check_login_data(request, login_data)
+ oidc_profile = oidc_client.authorization_code(
+ code=request.GET["code"],
+ code_verifier=login_data["code_verifier"],
+ redirect_uri=login_data["redirect_uri"],
+ )
+ user = cast(OIDCUser, request.user)
+ token = oidc_profile["refresh_token"]
+ secret = get_config()["secret_key"].encode()
+ salt = user.sub.encode()
+ encrypted_token = encrypt_data(token.encode(), secret, salt)
+ OIDCUserOfflineTokens.objects.create(
+ user_id=str(user.id), offline_token=encrypted_token
+ ).save()
+ return HttpResponseRedirect(reverse("oidc-profile") + "#tokens")
def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse:
@@ -187,12 +212,12 @@
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
- data = json.loads(request.body.decode("utf-8"))
+ data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"])
- password = data["password"].encode()
+ secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
- decrypted_token = decrypt_data(token_data.offline_token, password, salt)
+ decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain")
except InvalidToken:
return HttpResponse(status=401)
@@ -203,13 +228,13 @@
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
- data = json.loads(request.body.decode("utf-8"))
+ data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
for token_id in data["token_ids"]:
token_data = OIDCUserOfflineTokens.objects.get(id=token_id)
- password = data["password"].encode()
+ secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
- decrypted_token = decrypt_data(token_data.offline_token, password, salt)
+ decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
oidc_client = get_oidc_client()
oidc_client.logout(decrypted_token.decode("ascii"))
token_data.delete()
@@ -232,6 +257,11 @@
oidc_generate_bearer_token,
name="oidc-generate-bearer-token",
),
+ url(
+ r"^oidc/generate-bearer-token-complete/$",
+ oidc_generate_bearer_token_complete,
+ name="oidc-generate-bearer-token-complete",
+ ),
url(
r"^oidc/list-bearer-token/$",
oidc_list_bearer_tokens,
diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py
--- a/swh/web/settings/tests.py
+++ b/swh/web/settings/tests.py
@@ -75,7 +75,7 @@
},
},
"keycloak": {
- "server_url": "http://localhost:8080/auth",
+ "server_url": "http://localhost:8080/auth/",
"realm_name": "SoftwareHeritage",
},
}
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
--- a/swh/web/tests/auth/keycloak_mock.py
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -67,12 +67,10 @@
self.refresh_token = Mock()
self.userinfo = Mock()
self.logout = Mock()
- self.offline_token = Mock()
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.refresh_token.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(userinfo)
- self.offline_token.return_value = oidc_profile["refresh_token"]
else:
self.authorization_url = Mock()
exception = KeycloakError(
@@ -83,7 +81,6 @@
self.refresh_token.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
- self.offline_token = exception
def decode_token(self, token):
options = {}
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -7,14 +7,13 @@
from urllib.parse import urljoin, urlparse
import uuid
-from keycloak.exceptions import KeycloakError
import pytest
from django.contrib.auth.models import AnonymousUser, User
from django.http import QueryDict
from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens
-from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
+from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data
from swh.web.common.utils import reverse
from swh.web.config import get_config
from swh.web.tests.django_asserts import assert_contains
@@ -29,25 +28,9 @@
from .keycloak_mock import mock_keycloak
-@pytest.mark.django_db
-def test_oidc_login_views_success(client, mocker):
- """
- Simulate a successful login authentication with OpenID Connect
- authorization code flow with PKCE.
- """
- # mock Keycloak client
- kc_oidc_mock = mock_keycloak(mocker)
-
- # user initiates login process
- login_url = reverse("oidc-login")
-
- # should redirect to Keycloak authentication page in order
- # for a user to login with its username / password
- response = check_html_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
+def _check_oidc_login_code_flow_data(
+ request, response, kc_oidc_mock, redirect_uri, scope="openid"
+):
parsed_url = urlparse(response["location"])
authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
@@ -60,11 +43,11 @@
assert "response_type" in query_dict
assert query_dict["response_type"] == "code"
assert "redirect_uri" in query_dict
- assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request)
+ assert query_dict["redirect_uri"] == redirect_uri
assert "code_challenge_method" in query_dict
assert query_dict["code_challenge_method"] == "S256"
assert "scope" in query_dict
- assert query_dict["scope"] == "openid"
+ assert query_dict["scope"] == scope
assert "state" in query_dict
assert "code_challenge" in query_dict
@@ -75,6 +58,34 @@
assert "state" in login_data
assert "redirect_uri" in login_data
assert login_data["redirect_uri"] == query_dict["redirect_uri"]
+ return login_data
+
+
+@pytest.mark.django_db
+def test_oidc_login_views_success(client, mocker):
+ """
+ Simulate a successful login authentication with OpenID Connect
+ authorization code flow with PKCE.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ # user initiates login process
+ login_url = reverse("oidc-login")
+
+ # should redirect to Keycloak authentication page in order
+ # for a user to login with its username / password
+ response = check_html_get_response(client, login_url, status_code=302)
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ login_data = _check_oidc_login_code_flow_data(
+ request,
+ response,
+ kc_oidc_mock,
+ redirect_uri=reverse("oidc-login-complete", request=request),
+ )
# once a user has identified himself in Keycloak, he is
# redirected to the 'oidc-login-complete' view to
@@ -346,46 +357,73 @@
Anonymous user should be refused access with forbidden response.
"""
url = reverse("oidc-generate-bearer-token")
- check_http_post_response(client, url, data={"password": "secret"}, status_code=403)
+ check_http_get_response(client, url, status_code=403)
-def _generate_bearer_token(client, password):
+def _generate_and_test_bearer_token(client, kc_oidc_mock):
+ # user authenticates
client.login(
code="code", code_verifier="code-verifier", redirect_uri="redirect-uri"
)
+ # user initiates bearer token generation flow
url = reverse("oidc-generate-bearer-token")
- return client.post(
- url, data={"password": password}, content_type="application/json"
+ response = check_http_get_response(client, url, status_code=302)
+ request = response.wsgi_request
+ redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
+ # check login data and redirection to Keycloak is valid
+ login_data = _check_oidc_login_code_flow_data(
+ request,
+ response,
+ kc_oidc_mock,
+ redirect_uri=redirect_uri,
+ scope="openid offline_access",
)
+ # once a user has identified himself in Keycloak, he is
+ # redirected to the 'oidc-generate-bearer-token-complete' view
+ # to get and save bearer token
-@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker):
- """
- User with correct credentials should be allowed to generate a token.
- """
- kc_mock = mock_keycloak(mocker)
- password = "secret"
- response = _generate_bearer_token(client, password)
- user = response.wsgi_request.user
- assert response.status_code == 200
- assert response.content.decode("ascii") == kc_mock.offline_token(
- username=user.username, password=password
+ # generate authorization code / session state in the same
+ # manner as Keycloak
+ code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
+ session_state = str(uuid.uuid4())
+
+ token_complete_url = reverse(
+ "oidc-generate-bearer-token-complete",
+ query_params={
+ "code": code,
+ "state": login_data["state"],
+ "session_state": session_state,
+ },
)
+ nb_tokens = len(OIDCUserOfflineTokens.objects.all())
+ response = check_html_get_response(client, token_complete_url, status_code=302)
+ request = response.wsgi_request
+
+ # check token has been generated and saved encrypted to database
+ assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1
+ encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token
+ secret = get_config()["secret_key"].encode()
+ salt = request.user.sub.encode()
+ decrypted_token = decrypt_data(encrypted_token, secret, salt)
+ oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri)
+ assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"]
+
+ # should redirect to tokens management Web UI
+ assert response["location"] == reverse("oidc-profile") + "#tokens"
+
+ return decrypted_token
+
@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker):
+def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker):
"""
- User with wrong credentials should not be allowed to generate a token.
+ Authenticated user should be able to generate a bearer token using OIDC
+ Authorization Code Flow.
"""
- response_code = 401
- kc_mock = mock_keycloak(mocker)
- kc_mock.offline_token.side_effect = KeycloakError(
- error_message="Invalid password", response_code=response_code
- )
- response = _generate_bearer_token(client, password="invalid-password")
- assert response.status_code == response_code
+ kc_oidc_mock = mock_keycloak(mocker)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
def test_oidc_list_bearer_tokens_anonymous_user(client):
@@ -403,12 +441,11 @@
"""
User with correct credentials should be allowed to list his tokens.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for _ in range(nb_tokens):
- response = _generate_bearer_token(client, password)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
@@ -437,13 +474,11 @@
"""
User with correct credentials should be allowed to display a token.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for i in range(nb_tokens):
- response = _generate_bearer_token(client, password)
- token = response.content
+ token = _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse("oidc-get-bearer-token")
@@ -451,30 +486,12 @@
client,
url,
status_code=200,
- data={"password": password, "token_id": i + 1},
+ data={"token_id": i + 1},
content_type="text/plain",
)
assert response.content == token
-@pytest.mark.django_db
-def test_oidc_get_bearer_token_invalid_password(client, mocker):
- """
- User with wrong credentials should not be allowed to display a token.
- """
- mock_keycloak(mocker)
- password = "secret"
- _generate_bearer_token(client, password)
-
- url = reverse("oidc-get-bearer-token")
- check_http_post_response(
- client,
- url,
- status_code=401,
- data={"password": "invalid-password", "token_id": 1},
- )
-
-
def test_oidc_revoke_bearer_tokens_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
@@ -488,46 +505,25 @@
"""
User with correct credentials should be allowed to revoke tokens.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for _ in range(nb_tokens):
- _generate_bearer_token(client, password)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse("oidc-revoke-bearer-tokens")
check_http_post_response(
- client, url, status_code=200, data={"password": password, "token_ids": [1]},
+ client, url, status_code=200, data={"token_ids": [1]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 2
check_http_post_response(
- client, url, status_code=200, data={"password": password, "token_ids": [2, 3]},
+ client, url, status_code=200, data={"token_ids": [2, 3]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 0
-@pytest.mark.django_db
-def test_oidc_revoke_bearer_token_invalid_password(client, mocker):
- """
- User with wrong credentials should not be allowed to revoke tokens.
- """
- mock_keycloak(mocker)
- password = "secret"
-
- _generate_bearer_token(client, password)
-
- url = reverse("oidc-revoke-bearer-tokens")
-
- check_http_post_response(
- client,
- url,
- status_code=401,
- data={"password": "invalid-password", "token_ids": [1]},
- )
-
-
def test_oidc_profile_view_anonymous_user(client):
"""
Non authenticated users should be redirected to login page when

File Metadata

Mime Type
text/plain
Expires
Wed, Sep 17, 4:55 PM (21 h, 53 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228470

Event Timeline