Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023656
D4692.id16707.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
34 KB
Subscribers
None
D4692.id16707.diff
View Options
diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js
--- a/cypress/integration/api-tokens.spec.js
+++ b/cypress/integration/api-tokens.spec.js
@@ -32,13 +32,8 @@
cy.visit(`${Urls.oidc_profile()}#tokens`);
}
- function generateToken(Urls, status, tokenValue = '') {
- cy.route({
- method: 'POST',
- url: `${Urls.oidc_generate_bearer_token()}/**`,
- response: tokenValue,
- status: status
- }).as('generateTokenRequest');
+ it('should initiate token generation flow', function() {
+ initTokensPage(this.Urls, []);
cy.contains('Generate new token')
.click();
@@ -49,50 +44,22 @@
cy.get('.modal-header')
.should('contain', 'Bearer token generation');
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret');
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.click();
- cy.wait('@generateTokenRequest');
-
- if (status === 200) {
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
- }
- }
-
- it('should generate and display bearer token', function() {
- initTokensPage(this.Urls, []);
- const tokenValue = 'bearer-token-value';
- generateToken(this.Urls, 200, tokenValue);
- cy.get('#swh-token-success-message')
- .should('contain', 'Below is your token');
- cy.get('#swh-bearer-token')
- .should('contain', tokenValue);
+ cy.location().should(loc => {
+ expect(loc.pathname).to.eq(this.Urls.oidc_generate_bearer_token());
+ });
});
- it('should report errors when token generation failed', function() {
- initTokensPage(this.Urls, []);
- generateToken(this.Urls, 400);
- cy.get('#swh-token-error-message')
+ it('should report error when not logged in and visiting a token generation URL', function() {
+ cy.visit(this.Urls.oidc_generate_bearer_token_complete(), {failOnStatusCode: false});
+ cy.get('.swh-http-error')
+ .should('be.visible');
+ cy.get('.swh-http-error-code')
+ .should('contain', 403);
+ cy.get('.swh-http-error-desc')
.should('contain', 'You are not allowed to generate bearer tokens');
- cy.get('#swh-web-modal-html .close').click();
- generateToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
- generateToken(this.Urls, 500);
- cy.get('#swh-token-error-message')
- .should('contain', 'Internal server error');
-
});
function displayToken(Urls, status, tokenValue = '') {
@@ -111,45 +78,20 @@
cy.get('.modal-header')
.should('contain', 'Display bearer token');
-
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret', {force: true});
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
- .click({force: true});
-
- cy.wait('@getTokenRequest');
-
- if (status === 200) {
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
- }
}
it('should show a token when requested', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
const tokenValue = 'token-value';
displayToken(this.Urls, 200, tokenValue);
- cy.get('#swh-token-success-message')
- .should('contain', 'Below is your token');
cy.get('#swh-bearer-token')
.should('contain', tokenValue);
});
it('should report errors when token display failed', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
- displayToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
displayToken(this.Urls, 500);
- cy.get('#swh-token-error-message')
+ cy.get('.modal-body')
.should('contain', 'Internal server error');
});
@@ -170,22 +112,13 @@
cy.get('.modal-header')
.should('contain', 'Revoke bearer token');
- cy.get('#swh-user-password-submit')
- .should('be.disabled');
-
- cy.get('#swh-user-password')
- .type('secret', {force: true});
-
- cy.get('#swh-user-password-submit')
- .should('be.enabled');
-
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.click({force: true});
cy.wait('@revokeTokenRequest');
if (status === 200) {
- cy.get('#swh-user-password-submit')
+ cy.get('#swh-token-form-submit')
.should('be.disabled');
}
}
@@ -193,16 +126,12 @@
it('should revoke a token when requested', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
revokeToken(this.Urls, 200);
- cy.get('#swh-token-success-message')
+ cy.get('#swh-token-form-message')
.should('contain', 'Bearer token successfully revoked');
});
it('should report errors when token revoke failed', function() {
initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]);
- revokeToken(this.Urls, 401);
- cy.get('#swh-token-error-message')
- .should('contain', 'The password is invalid');
- cy.get('#swh-web-modal-html .close').click();
revokeToken(this.Urls, 500);
cy.get('#swh-token-error-message')
.should('contain', 'Internal server error');
diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js
--- a/swh/web/assets/src/bundles/auth/index.js
+++ b/swh/web/assets/src/bundles/auth/index.js
@@ -10,104 +10,64 @@
let apiTokensTable;
-function updateSubmitButtonState() {
- const val = $('#swh-user-password').val();
- $('#swh-user-password-submit').prop('disabled', val.length === 0);
-}
-
-function passwordForm(infoText, buttonText) {
+function tokenForm(infoText, buttonText) {
const form =
- `<form id="swh-password-form">
- <p id="swh-password-form-text">${infoText}</p>
- <label for="swh-user-password">Password: </label>
- <input id="swh-user-password" type="password" name="swh-user-password" required>
- <input id="swh-user-password-submit" type="submit" value="${buttonText}" disabled>
+ `<form id="swh-token-form" class="text-center">
+ <p id="swh-token-form-text">${infoText}</p>
+ <input id="swh-token-form-submit" type="submit" value="${buttonText}">
+ <div id="swh-token-form-message"></div>
</form>`;
return form;
}
function errorMessage(message) {
- return `<p id="swh-token-error-message" class="mt-3">${message}</p>`;
+ return `<p id="swh-token-error-message" class="mt-3 swh-token-form-message">${message}</p>`;
}
function successMessage(message) {
- return `<p id="swh-token-success-message" class="mt-3">${message}</p>`;
+ return `<p id="swh-token-success-message" class="mt-3 swh-token-form-message">${message}</p>`;
}
function disableSubmitButton() {
- $('#swh-user-password-submit').prop('disabled', true);
- $('#swh-user-password').off('change');
- $('#swh-user-password').off('keyup');
+ $('#swh-token-form-submit').prop('disabled', true);
}
function generateToken() {
- csrfPost(Urls.oidc_generate_bearer_token(), {},
- JSON.stringify({password: $('#swh-user-password').val()}))
- .then(handleFetchError)
- .then(response => response.text())
- .then(token => {
- disableSubmitButton();
- const tokenHtml =
- `${successMessage('Below is your token.')}
- <pre id="swh-bearer-token">${token}</pre>`;
- $(`#swh-password-form`).append(tokenHtml);
- apiTokensTable.draw();
- })
- .catch(response => {
- if (response.status === 400) {
- $(`#swh-password-form`).append(
- errorMessage('You are not allowed to generate bearer tokens.'));
- } else if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
- });
+ window.location = Urls.oidc_generate_bearer_token();
}
function displayToken(tokenId) {
const postData = {
- password: $('#swh-user-password').val(),
token_id: tokenId
};
csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData))
.then(handleFetchError)
.then(response => response.text())
.then(token => {
- disableSubmitButton();
const tokenHtml =
- `${successMessage('Below is your token.')}
- <pre id="swh-bearer-token">${token}</pre>`;
- $(`#swh-password-form`).append(tokenHtml);
+ `<p>Below is your token.</p>
+ <pre id="swh-bearer-token" class="mt-3">${token}</pre>`;
+ swh.webapp.showModalHtml('Display bearer token', tokenHtml);
})
- .catch(response => {
- if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
+ .catch(() => {
+ swh.webapp.showModalHtml('Display bearer token', errorMessage('Internal server error.'));
});
}
function revokeTokens(tokenIds) {
const postData = {
- password: $('#swh-user-password').val(),
token_ids: tokenIds
};
csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData))
.then(handleFetchError)
.then(() => {
disableSubmitButton();
- $(`#swh-password-form`).append(
- successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`));
+ $('#swh-token-form-message').html(
+ successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked.`));
apiTokensTable.draw();
})
- .catch(response => {
- if (response.status === 401) {
- $(`#swh-password-form`).append(errorMessage('The password is invalid.'));
- } else {
- $(`#swh-password-form`).append(errorMessage('Internal server error.'));
- }
+ .catch(() => {
+ $('#swh-token-form-message').html(errorMessage('Internal server error.'));
});
}
@@ -126,27 +86,26 @@
export function applyTokenAction(action, tokenId) {
const actionData = {
+ display: {
+ submitCallback: displayToken
+ },
generate: {
modalTitle: 'Bearer token generation',
- infoText: 'Enter your password and click on the button to generate the token.',
+ infoText: 'Click on the button to generate the token. You will be redirected to ' +
+ 'Software Heritage Authentication Service and might be asked to enter ' +
+ 'your password again.',
buttonText: 'Generate token',
submitCallback: generateToken
},
- display: {
- modalTitle: 'Display bearer token',
- infoText: 'Enter your password and click on the button to display the token.',
- buttonText: 'Display token',
- submitCallback: displayToken
- },
revoke: {
modalTitle: 'Revoke bearer token',
- infoText: 'Enter your password and click on the button to revoke the token.',
+ infoText: 'Click on the button to revoke the token.',
buttonText: 'Revoke token',
submitCallback: revokeToken
},
revokeAll: {
modalTitle: 'Revoke all bearer tokens',
- infoText: 'Enter your password and click on the button to revoke all tokens.',
+ infoText: 'Click on the button to revoke all tokens.',
buttonText: 'Revoke tokens',
submitCallback: revokeAllTokens
}
@@ -156,17 +115,19 @@
return;
}
- const passwordFormHtml = passwordForm(
- actionData[action].infoText, actionData[action].buttonText);
+ if (action !== 'display') {
+ const tokenFormHtml = tokenForm(
+ actionData[action].infoText, actionData[action].buttonText);
- swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml);
- $('#swh-user-password').change(updateSubmitButtonState);
- $('#swh-user-password').keyup(updateSubmitButtonState);
- $(`#swh-password-form`).submit(event => {
- event.preventDefault();
- event.stopPropagation();
+ swh.webapp.showModalHtml(actionData[action].modalTitle, tokenFormHtml);
+ $(`#swh-token-form`).submit(event => {
+ event.preventDefault();
+ event.stopPropagation();
+ actionData[action].submitCallback(tokenId);
+ });
+ } else {
actionData[action].submitCallback(tokenId);
- });
+ }
}
export function initProfilePage() {
diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py
--- a/swh/web/auth/keycloak.py
+++ b/swh/web/auth/keycloak.py
@@ -83,27 +83,6 @@
**extra_params,
)
- def offline_token(self, username: str, password: str) -> str:
- """
- Generate an OpenID Connect offline refresh token.
-
- Offline tokens are a special type of refresh tokens with long-lived period.
- It enables to open a new authenticated session without having to login again.
-
- Args:
- username: username in the Keycloak realm
- password: password associated to the username
-
- Returns:
- An offline refresh token
- """
- return self._keycloak.token(
- grant_type="password",
- scope="openid offline_access",
- username=username,
- password=password,
- )["refresh_token"]
-
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Request a new access token from Keycloak using a refresh token.
diff --git a/swh/web/auth/migrations/0002_remove_stored_tokens.py b/swh/web/auth/migrations/0002_remove_stored_tokens.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/migrations/0002_remove_stored_tokens.py
@@ -0,0 +1,22 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from django.db import migrations
+
+from swh.web.auth.models import OIDCUserOfflineTokens
+
+
+def _remove_stored_encrypted_tokens(apps, schema_editor):
+ OIDCUserOfflineTokens.objects.all().delete()
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("swh.web.auth", "0001_initial"),
+ ]
+
+ operations = [migrations.RunPython(_remove_stored_encrypted_tokens)]
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -8,8 +8,6 @@
import uuid
from cryptography.fernet import InvalidToken
-from keycloak.exceptions import KeycloakError
-import sentry_sdk
from django.conf.urls import url
from django.contrib.auth import authenticate, login, logout
@@ -21,7 +19,6 @@
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
- HttpResponseServerError,
JsonResponse,
)
from django.shortcuts import render
@@ -34,17 +31,14 @@
gen_oidc_pkce_codes,
get_oidc_client,
)
-from swh.web.common.exc import BadInputExc
+from swh.web.common.exc import BadInputExc, ForbiddenExc
from swh.web.common.utils import reverse
+from swh.web.config import get_config
-def oidc_login(request: HttpRequest) -> HttpResponse:
- """
- Django view to initiate login process using OpenID Connect.
- """
+def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"):
# generate a CSRF token
state = str(uuid.uuid4())
- redirect_uri = reverse("oidc-login-complete", request=request)
code_verifier, code_challenge = gen_oidc_pkce_codes()
@@ -60,7 +54,7 @@
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
- "scope": "openid",
+ "scope": scope,
"prompt": request.GET.get("prompt", ""),
}
@@ -72,26 +66,24 @@
return HttpResponseRedirect(authorization_url)
-def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+def oidc_login(request: HttpRequest) -> HttpResponse:
"""
- Django view to finalize login process using OpenID Connect.
+ Django view to initiate login process using OpenID Connect.
"""
+
+ redirect_uri = reverse("oidc-login-complete", request=request)
+
+ return _oidc_login(request, redirect_uri=redirect_uri)
+
+
+def _get_login_data(request: HttpRequest) -> Dict[str, Any]:
if "login_data" not in request.session:
raise Exception("Login process has not been initialized.")
- login_data = request.session["login_data"]
- next_path = login_data["next_path"] or request.build_absolute_uri("/")
+ return request.session["login_data"]
- if "error" in request.GET:
- if login_data["prompt"] == "none":
- # Silent login failed because OIDC session expired.
- # Redirect to logout page and inform user.
- logout(request)
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
- return HttpResponseServerError(request.GET["error"])
+
+def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]):
if "code" not in request.GET or "state" not in request.GET:
raise BadInputExc("Missing query parameters for authentication.")
@@ -102,6 +94,26 @@
if state != login_data["state"]:
raise BadInputExc("Wrong CSRF token, aborting login process.")
+
+def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to finalize login process using OpenID Connect.
+ """
+ login_data = _get_login_data(request)
+ next_path = login_data["next_path"] or request.build_absolute_uri("/")
+ if "error" in request.GET and login_data["prompt"] == "none":
+ # Silent login failed because OIDC session expired.
+ # Redirect to logout page and inform user.
+ logout(request)
+ logout_url = reverse(
+ "logout", query_params={"next_path": next_path, "remote_user": 1}
+ )
+ return HttpResponseRedirect(logout_url)
+ elif "error" in request.GET:
+ raise Exception(request.GET["error"])
+
+ _check_login_data(request, login_data)
+
user = authenticate(
request=request,
code=request.GET["code"],
@@ -136,25 +148,38 @@
return HttpResponseRedirect(request.build_absolute_uri(logout_url))
-@require_http_methods(["POST"])
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
- try:
- data = json.loads(request.body.decode("utf-8"))
- user = cast(OIDCUser, request.user)
- oidc_client = get_oidc_client()
- token = oidc_client.offline_token(user.username, data["password"])
- password = data["password"].encode()
- salt = user.sub.encode()
- encrypted_token = encrypt_data(token.encode(), password, salt)
- OIDCUserOfflineTokens.objects.create(
- user_id=str(user.id), offline_token=encrypted_token
- ).save()
- return HttpResponse(token, content_type="text/plain")
- except KeycloakError as e:
- sentry_sdk.capture_exception(e)
- return HttpResponse(status=e.response_code or 500)
+ redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
+ return _oidc_login(
+ request, redirect_uri=redirect_uri, scope="openid offline_access"
+ )
+
+
+def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse:
+ if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
+ raise ForbiddenExc("You are not allowed to generate bearer tokens.")
+ if "error" in request.GET:
+ raise Exception(request.GET["error"])
+
+ oidc_client = get_oidc_client()
+ login_data = _get_login_data(request)
+ _check_login_data(request, login_data)
+ oidc_profile = oidc_client.authorization_code(
+ code=request.GET["code"],
+ code_verifier=login_data["code_verifier"],
+ redirect_uri=login_data["redirect_uri"],
+ )
+ user = cast(OIDCUser, request.user)
+ token = oidc_profile["refresh_token"]
+ secret = get_config()["secret_key"].encode()
+ salt = user.sub.encode()
+ encrypted_token = encrypt_data(token.encode(), secret, salt)
+ OIDCUserOfflineTokens.objects.create(
+ user_id=str(user.id), offline_token=encrypted_token
+ ).save()
+ return HttpResponseRedirect(reverse("oidc-profile") + "#tokens")
def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse:
@@ -187,12 +212,12 @@
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
- data = json.loads(request.body.decode("utf-8"))
+ data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"])
- password = data["password"].encode()
+ secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
- decrypted_token = decrypt_data(token_data.offline_token, password, salt)
+ decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain")
except InvalidToken:
return HttpResponse(status=401)
@@ -203,13 +228,13 @@
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
try:
- data = json.loads(request.body.decode("utf-8"))
+ data = json.loads(request.body.decode("ascii"))
user = cast(OIDCUser, request.user)
for token_id in data["token_ids"]:
token_data = OIDCUserOfflineTokens.objects.get(id=token_id)
- password = data["password"].encode()
+ secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
- decrypted_token = decrypt_data(token_data.offline_token, password, salt)
+ decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
oidc_client = get_oidc_client()
oidc_client.logout(decrypted_token.decode("ascii"))
token_data.delete()
@@ -232,6 +257,11 @@
oidc_generate_bearer_token,
name="oidc-generate-bearer-token",
),
+ url(
+ r"^oidc/generate-bearer-token-complete/$",
+ oidc_generate_bearer_token_complete,
+ name="oidc-generate-bearer-token-complete",
+ ),
url(
r"^oidc/list-bearer-token/$",
oidc_list_bearer_tokens,
diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py
--- a/swh/web/settings/tests.py
+++ b/swh/web/settings/tests.py
@@ -75,7 +75,7 @@
},
},
"keycloak": {
- "server_url": "http://localhost:8080/auth",
+ "server_url": "http://localhost:8080/auth/",
"realm_name": "SoftwareHeritage",
},
}
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
--- a/swh/web/tests/auth/keycloak_mock.py
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -67,12 +67,10 @@
self.refresh_token = Mock()
self.userinfo = Mock()
self.logout = Mock()
- self.offline_token = Mock()
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.refresh_token.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(userinfo)
- self.offline_token.return_value = oidc_profile["refresh_token"]
else:
self.authorization_url = Mock()
exception = KeycloakError(
@@ -83,7 +81,6 @@
self.refresh_token.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
- self.offline_token = exception
def decode_token(self, token):
options = {}
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -7,14 +7,13 @@
from urllib.parse import urljoin, urlparse
import uuid
-from keycloak.exceptions import KeycloakError
import pytest
from django.contrib.auth.models import AnonymousUser, User
from django.http import QueryDict
from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens
-from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
+from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data
from swh.web.common.utils import reverse
from swh.web.config import get_config
from swh.web.tests.django_asserts import assert_contains
@@ -29,25 +28,9 @@
from .keycloak_mock import mock_keycloak
-@pytest.mark.django_db
-def test_oidc_login_views_success(client, mocker):
- """
- Simulate a successful login authentication with OpenID Connect
- authorization code flow with PKCE.
- """
- # mock Keycloak client
- kc_oidc_mock = mock_keycloak(mocker)
-
- # user initiates login process
- login_url = reverse("oidc-login")
-
- # should redirect to Keycloak authentication page in order
- # for a user to login with its username / password
- response = check_html_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
+def _check_oidc_login_code_flow_data(
+ request, response, kc_oidc_mock, redirect_uri, scope="openid"
+):
parsed_url = urlparse(response["location"])
authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
@@ -60,11 +43,11 @@
assert "response_type" in query_dict
assert query_dict["response_type"] == "code"
assert "redirect_uri" in query_dict
- assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request)
+ assert query_dict["redirect_uri"] == redirect_uri
assert "code_challenge_method" in query_dict
assert query_dict["code_challenge_method"] == "S256"
assert "scope" in query_dict
- assert query_dict["scope"] == "openid"
+ assert query_dict["scope"] == scope
assert "state" in query_dict
assert "code_challenge" in query_dict
@@ -75,6 +58,34 @@
assert "state" in login_data
assert "redirect_uri" in login_data
assert login_data["redirect_uri"] == query_dict["redirect_uri"]
+ return login_data
+
+
+@pytest.mark.django_db
+def test_oidc_login_views_success(client, mocker):
+ """
+ Simulate a successful login authentication with OpenID Connect
+ authorization code flow with PKCE.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ # user initiates login process
+ login_url = reverse("oidc-login")
+
+ # should redirect to Keycloak authentication page in order
+ # for a user to login with its username / password
+ response = check_html_get_response(client, login_url, status_code=302)
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ login_data = _check_oidc_login_code_flow_data(
+ request,
+ response,
+ kc_oidc_mock,
+ redirect_uri=reverse("oidc-login-complete", request=request),
+ )
# once a user has identified himself in Keycloak, he is
# redirected to the 'oidc-login-complete' view to
@@ -346,46 +357,73 @@
Anonymous user should be refused access with forbidden response.
"""
url = reverse("oidc-generate-bearer-token")
- check_http_post_response(client, url, data={"password": "secret"}, status_code=403)
+ check_http_get_response(client, url, status_code=403)
-def _generate_bearer_token(client, password):
+def _generate_and_test_bearer_token(client, kc_oidc_mock):
+ # user authenticates
client.login(
code="code", code_verifier="code-verifier", redirect_uri="redirect-uri"
)
+ # user initiates bearer token generation flow
url = reverse("oidc-generate-bearer-token")
- return client.post(
- url, data={"password": password}, content_type="application/json"
+ response = check_http_get_response(client, url, status_code=302)
+ request = response.wsgi_request
+ redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
+ # check login data and redirection to Keycloak is valid
+ login_data = _check_oidc_login_code_flow_data(
+ request,
+ response,
+ kc_oidc_mock,
+ redirect_uri=redirect_uri,
+ scope="openid offline_access",
)
+ # once a user has identified himself in Keycloak, he is
+ # redirected to the 'oidc-generate-bearer-token-complete' view
+ # to get and save bearer token
-@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker):
- """
- User with correct credentials should be allowed to generate a token.
- """
- kc_mock = mock_keycloak(mocker)
- password = "secret"
- response = _generate_bearer_token(client, password)
- user = response.wsgi_request.user
- assert response.status_code == 200
- assert response.content.decode("ascii") == kc_mock.offline_token(
- username=user.username, password=password
+ # generate authorization code / session state in the same
+ # manner as Keycloak
+ code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
+ session_state = str(uuid.uuid4())
+
+ token_complete_url = reverse(
+ "oidc-generate-bearer-token-complete",
+ query_params={
+ "code": code,
+ "state": login_data["state"],
+ "session_state": session_state,
+ },
)
+ nb_tokens = len(OIDCUserOfflineTokens.objects.all())
+ response = check_html_get_response(client, token_complete_url, status_code=302)
+ request = response.wsgi_request
+
+ # check token has been generated and saved encrypted to database
+ assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1
+ encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token
+ secret = get_config()["secret_key"].encode()
+ salt = request.user.sub.encode()
+ decrypted_token = decrypt_data(encrypted_token, secret, salt)
+ oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri)
+ assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"]
+
+ # should redirect to tokens management Web UI
+ assert response["location"] == reverse("oidc-profile") + "#tokens"
+
+ return decrypted_token
+
@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker):
+def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker):
"""
- User with wrong credentials should not be allowed to generate a token.
+ Authenticated user should be able to generate a bearer token using OIDC
+ Authorization Code Flow.
"""
- response_code = 401
- kc_mock = mock_keycloak(mocker)
- kc_mock.offline_token.side_effect = KeycloakError(
- error_message="Invalid password", response_code=response_code
- )
- response = _generate_bearer_token(client, password="invalid-password")
- assert response.status_code == response_code
+ kc_oidc_mock = mock_keycloak(mocker)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
def test_oidc_list_bearer_tokens_anonymous_user(client):
@@ -403,12 +441,11 @@
"""
User with correct credentials should be allowed to list his tokens.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for _ in range(nb_tokens):
- response = _generate_bearer_token(client, password)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
@@ -437,13 +474,11 @@
"""
User with correct credentials should be allowed to display a token.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for i in range(nb_tokens):
- response = _generate_bearer_token(client, password)
- token = response.content
+ token = _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse("oidc-get-bearer-token")
@@ -451,30 +486,12 @@
client,
url,
status_code=200,
- data={"password": password, "token_id": i + 1},
+ data={"token_id": i + 1},
content_type="text/plain",
)
assert response.content == token
-@pytest.mark.django_db
-def test_oidc_get_bearer_token_invalid_password(client, mocker):
- """
- User with wrong credentials should not be allowed to display a token.
- """
- mock_keycloak(mocker)
- password = "secret"
- _generate_bearer_token(client, password)
-
- url = reverse("oidc-get-bearer-token")
- check_http_post_response(
- client,
- url,
- status_code=401,
- data={"password": "invalid-password", "token_id": 1},
- )
-
-
def test_oidc_revoke_bearer_tokens_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
@@ -488,46 +505,25 @@
"""
User with correct credentials should be allowed to revoke tokens.
"""
- mock_keycloak(mocker)
+ kc_oidc_mock = mock_keycloak(mocker)
nb_tokens = 3
- password = "secret"
for _ in range(nb_tokens):
- _generate_bearer_token(client, password)
+ _generate_and_test_bearer_token(client, kc_oidc_mock)
url = reverse("oidc-revoke-bearer-tokens")
check_http_post_response(
- client, url, status_code=200, data={"password": password, "token_ids": [1]},
+ client, url, status_code=200, data={"token_ids": [1]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 2
check_http_post_response(
- client, url, status_code=200, data={"password": password, "token_ids": [2, 3]},
+ client, url, status_code=200, data={"token_ids": [2, 3]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 0
-@pytest.mark.django_db
-def test_oidc_revoke_bearer_token_invalid_password(client, mocker):
- """
- User with wrong credentials should not be allowed to revoke tokens.
- """
- mock_keycloak(mocker)
- password = "secret"
-
- _generate_bearer_token(client, password)
-
- url = reverse("oidc-revoke-bearer-tokens")
-
- check_http_post_response(
- client,
- url,
- status_code=401,
- data={"password": "invalid-password", "token_ids": [1]},
- )
-
-
def test_oidc_profile_view_anonymous_user(client):
"""
Non authenticated users should be redirected to login page when
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Sep 17, 4:55 PM (21 h, 53 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228470
Attached To
D4692: auth: Generate bearer token using OIDC Authorization Code flow
Event Timeline
Log In to Comment