Page MenuHomeSoftware Heritage

test_views.py
No OneTemporary

test_views.py

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from urllib.parse import urljoin, urlparse
import uuid
import pytest
from django.contrib.auth.models import AnonymousUser, User
from django.http import QueryDict
from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data
from swh.web.common.utils import reverse
from swh.web.config import get_config
from swh.web.tests.django_asserts import assert_contains
from swh.web.tests.utils import (
check_html_get_response,
check_http_get_response,
check_http_post_response,
)
from swh.web.urls import _default_view as homepage_view
def _check_oidc_login_code_flow_data(
request, response, kc_oidc_mock, redirect_uri, scope="openid"
):
parsed_url = urlparse(response["location"])
authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
query_dict = QueryDict(parsed_url.query)
# check redirect url is valid
assert urljoin(response["location"], parsed_url.path) == authorization_url
assert "client_id" in query_dict
assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID
assert "response_type" in query_dict
assert query_dict["response_type"] == "code"
assert "redirect_uri" in query_dict
assert query_dict["redirect_uri"] == redirect_uri
assert "code_challenge_method" in query_dict
assert query_dict["code_challenge_method"] == "S256"
assert "scope" in query_dict
assert query_dict["scope"] == scope
assert "state" in query_dict
assert "code_challenge" in query_dict
# check a login_data has been registered in user session
assert "login_data" in request.session
login_data = request.session["login_data"]
assert "code_verifier" in login_data
assert "state" in login_data
assert "redirect_uri" in login_data
assert login_data["redirect_uri"] == query_dict["redirect_uri"]
return login_data
@pytest.mark.django_db
def test_oidc_login_views_success(client, keycloak_mock):
"""
Simulate a successful login authentication with OpenID Connect
authorization code flow with PKCE.
"""
# user initiates login process
login_url = reverse("oidc-login")
# should redirect to Keycloak authentication page in order
# for a user to login with its username / password
response = check_html_get_response(client, login_url, status_code=302)
request = response.wsgi_request
assert isinstance(request.user, AnonymousUser)
login_data = _check_oidc_login_code_flow_data(
request,
response,
keycloak_mock,
redirect_uri=reverse("oidc-login-complete", request=request),
)
# once a user has identified himself in Keycloak, he is
# redirected to the 'oidc-login-complete' view to
# login in Django.
# generate authorization code / session state in the same
# manner as Keycloak
code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
session_state = str(uuid.uuid4())
login_complete_url = reverse(
"oidc-login-complete",
query_params={
"code": code,
"state": login_data["state"],
"session_state": session_state,
},
)
# login process finalization, should redirect to root url by default
response = check_html_get_response(client, login_complete_url, status_code=302)
request = response.wsgi_request
assert response["location"] == request.build_absolute_uri("/")
# user should be authenticated
assert isinstance(request.user, OIDCUser)
# check remote user has not been saved to Django database
with pytest.raises(User.DoesNotExist):
User.objects.get(username=request.user.username)
@pytest.mark.django_db
def test_oidc_logout_view_success(client, keycloak_mock):
"""
Simulate a successful logout operation with OpenID Connect.
"""
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
keycloak_mock.authorization_code.assert_called()
# user initiates logout
oidc_logout_url = reverse("oidc-logout")
# should redirect to logout page
response = check_html_get_response(client, oidc_logout_url, status_code=302)
request = response.wsgi_request
logout_url = reverse("logout", query_params={"remote_user": 1})
assert response["location"] == request.build_absolute_uri(logout_url)
# should have been logged out in Keycloak
oidc_profile = keycloak_mock.login()
keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"])
# check effective logout in Django
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_login_view_failure(client, keycloak_mock):
"""
Simulate a failed authentication with OpenID Connect.
"""
keycloak_mock.set_auth_success(False)
# user initiates login process
login_url = reverse("oidc-login")
# should render an error page
response = check_html_get_response(
client, login_url, status_code=500, template_used="error.html"
)
request = response.wsgi_request
# no users should be logged in
assert isinstance(request.user, AnonymousUser)
# Simulate possible errors with OpenID Connect in the login complete view.
def test_oidc_login_complete_view_no_login_data(client, mocker):
# user initiates login process
login_url = reverse("oidc-login-complete")
# should render an error page
response = check_html_get_response(
client, login_url, status_code=500, template_used="error.html"
)
assert_contains(
response, "Login process has not been initialized.", status_code=500
)
def test_oidc_login_complete_view_missing_parameters(client, mocker):
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
}
session.save()
# user initiates login process
login_url = reverse("oidc-login-complete")
# should render an error page
response = check_html_get_response(
client, login_url, status_code=400, template_used="error.html"
)
request = response.wsgi_request
assert_contains(
response, "Missing query parameters for authentication.", status_code=400
)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock):
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
}
session.save()
# user initiates login process
login_url = reverse(
"oidc-login-complete", query_params={"code": "some-code", "state": "some-state"}
)
# should render an error page
response = check_html_get_response(
client, login_url, status_code=400, template_used="error.html"
)
request = response.wsgi_request
assert_contains(
response, "Wrong CSRF token, aborting login process.", status_code=400
)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock):
keycloak_mock.set_auth_success(False)
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
}
session.save()
# check authentication error is reported
login_url = reverse(
"oidc-login-complete",
query_params={"code": "some-code", "state": session["login_data"]["state"]},
)
# should render an error page
response = check_html_get_response(
client, login_url, status_code=500, template_used="error.html"
)
request = response.wsgi_request
assert_contains(response, "User authentication failed.", status_code=500)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_logout_view_failure(client, keycloak_mock):
"""
Simulate a failed logout operation with OpenID Connect.
"""
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
err_msg = "Authentication server error"
keycloak_mock.logout.side_effect = Exception(err_msg)
# user initiates logout process
logout_url = reverse("oidc-logout")
# should render an error page
response = check_html_get_response(
client, logout_url, status_code=500, template_used="error.html"
)
request = response.wsgi_request
assert_contains(response, err_msg, status_code=500)
# user should be logged out from Django anyway
assert isinstance(request.user, AnonymousUser)
def test_view_rendering_when_user_not_set_in_request(request_factory):
request = request_factory.get("/")
# Django RequestFactory do not set any user by default
assert not hasattr(request, "user")
response = homepage_view(request)
assert response.status_code == 200
def test_oidc_generate_bearer_token_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
"""
url = reverse("oidc-generate-bearer-token")
check_http_get_response(client, url, status_code=403)
def _generate_and_test_bearer_token(client, kc_oidc_mock):
# user authenticates
client.login(
code="code", code_verifier="code-verifier", redirect_uri="redirect-uri"
)
# user initiates bearer token generation flow
url = reverse("oidc-generate-bearer-token")
response = check_http_get_response(client, url, status_code=302)
request = response.wsgi_request
redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
# check login data and redirection to Keycloak is valid
login_data = _check_oidc_login_code_flow_data(
request,
response,
kc_oidc_mock,
redirect_uri=redirect_uri,
scope="openid offline_access",
)
# once a user has identified himself in Keycloak, he is
# redirected to the 'oidc-generate-bearer-token-complete' view
# to get and save bearer token
# generate authorization code / session state in the same
# manner as Keycloak
code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
session_state = str(uuid.uuid4())
token_complete_url = reverse(
"oidc-generate-bearer-token-complete",
query_params={
"code": code,
"state": login_data["state"],
"session_state": session_state,
},
)
nb_tokens = len(OIDCUserOfflineTokens.objects.all())
response = check_html_get_response(client, token_complete_url, status_code=302)
request = response.wsgi_request
# check token has been generated and saved encrypted to database
assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1
encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token
secret = get_config()["secret_key"].encode()
salt = request.user.sub.encode()
decrypted_token = decrypt_data(encrypted_token, secret, salt)
oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri)
assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"]
# should redirect to tokens management Web UI
assert response["location"] == reverse("oidc-profile") + "#tokens"
return decrypted_token
@pytest.mark.django_db
def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock):
"""
Authenticated user should be able to generate a bearer token using OIDC
Authorization Code Flow.
"""
_generate_and_test_bearer_token(client, keycloak_mock)
def test_oidc_list_bearer_tokens_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
"""
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
)
check_http_get_response(client, url, status_code=403)
@pytest.mark.django_db
def test_oidc_list_bearer_tokens(client, keycloak_mock):
"""
User with correct credentials should be allowed to list his tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
_generate_and_test_bearer_token(client, keycloak_mock)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
)
response = check_http_get_response(client, url, status_code=200)
tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"]))
for oidc_token in OIDCUserOfflineTokens.objects.all():
assert (
oidc_token.creation_date.isoformat()
== tokens_data[oidc_token.id - 1]["creation_date"]
)
def test_oidc_get_bearer_token_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
"""
url = reverse("oidc-get-bearer-token")
check_http_post_response(client, url, status_code=403)
@pytest.mark.django_db
def test_oidc_get_bearer_token(client, keycloak_mock):
"""
User with correct credentials should be allowed to display a token.
"""
nb_tokens = 3
for i in range(nb_tokens):
token = _generate_and_test_bearer_token(client, keycloak_mock)
url = reverse("oidc-get-bearer-token")
response = check_http_post_response(
client,
url,
status_code=200,
data={"token_id": i + 1},
content_type="text/plain",
)
assert response.content == token
def test_oidc_revoke_bearer_tokens_anonymous_user(client):
"""
Anonymous user should be refused access with forbidden response.
"""
url = reverse("oidc-revoke-bearer-tokens")
check_http_post_response(client, url, status_code=403)
@pytest.mark.django_db
def test_oidc_revoke_bearer_tokens(client, keycloak_mock):
"""
User with correct credentials should be allowed to revoke tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
_generate_and_test_bearer_token(client, keycloak_mock)
url = reverse("oidc-revoke-bearer-tokens")
check_http_post_response(
client, url, status_code=200, data={"token_ids": [1]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 2
check_http_post_response(
client, url, status_code=200, data={"token_ids": [2, 3]},
)
assert len(OIDCUserOfflineTokens.objects.all()) == 0
def test_oidc_profile_view_anonymous_user(client):
"""
Non authenticated users should be redirected to login page when
requesting profile view.
"""
url = reverse("oidc-profile")
login_url = reverse("oidc-login", query_params={"next_path": url})
resp = check_html_get_response(client, url, status_code=302)
assert resp["location"] == login_url
@pytest.mark.django_db
def test_oidc_profile_view(client, keycloak_mock):
"""
Authenticated users should be able to request the profile page
and link to Keycloak account UI should be present.
"""
url = reverse("oidc-profile")
kc_config = get_config()["keycloak"]
user_permissions = ["perm1", "perm2"]
keycloak_mock.user_permissions = user_permissions
client.login(code="", code_verifier="", redirect_uri="")
resp = check_html_get_response(
client, url, status_code=200, template_used="auth/profile.html"
)
user = resp.wsgi_request.user
kc_account_url = (
f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/"
)
assert_contains(resp, kc_account_url)
assert_contains(resp, user.username)
assert_contains(resp, user.first_name)
assert_contains(resp, user.last_name)
assert_contains(resp, user.email)
for perm in user_permissions:
assert_contains(resp, perm)

File Metadata

Mime Type
text/x-python
Expires
Jul 4 2025, 9:33 AM (5 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3394774

Event Timeline