Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312804
D5367.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
54 KB
Subscribers
None
D5367.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
-swh.auth[django] >= 0.3.7
+swh.auth[django] >= 0.5.0
swh.core >= 0.0.95
swh.indexer >= 0.4.1
swh.model >= 0.5.0
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -16,7 +16,6 @@
prometheus-client
pybadges
pygments
-python-keycloak >= 0.19.0
python-magic >= 0.4.0
python-memcached
pyyaml
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
deleted file mode 100644
--- a/swh/web/auth/backends.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from datetime import datetime, timedelta
-import hashlib
-from typing import Any, Dict, Optional
-
-import sentry_sdk
-
-from django.core.cache import cache
-from django.http import HttpRequest
-from django.utils import timezone
-from rest_framework.authentication import BaseAuthentication
-from rest_framework.exceptions import AuthenticationFailed, ValidationError
-
-from swh.auth.django.models import OIDCUser
-from swh.web.auth.utils import get_oidc_client
-
-
-def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
- # compute an integer user identifier for Django User model
- # by concatenating all groups of the UUID4 user identifier
- # generated by Keycloak and converting it from hex to decimal
- user_id = int("".join(decoded_token["sub"].split("-")), 16)
-
- # create a Django user that will not be saved to database
- user = OIDCUser(
- id=user_id,
- username=decoded_token["preferred_username"],
- password="",
- first_name=decoded_token["given_name"],
- last_name=decoded_token["family_name"],
- email=decoded_token["email"],
- )
-
- # set is_staff user property based on groups
- if "groups" in decoded_token:
- user.is_staff = "/staff" in decoded_token["groups"]
-
- # extract user permissions if any
- resource_access = decoded_token.get("resource_access", {})
- client_resource_access = resource_access.get(get_oidc_client().client_id, {})
- user.permissions = set(client_resource_access.get("roles", []))
-
- # add user sub to custom User proxy model
- user.sub = decoded_token["sub"]
-
- return user
-
-
-def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
-
- oidc_client = get_oidc_client()
-
- # decode JWT token
- try:
- access_token = oidc_profile["access_token"]
- decoded_token = oidc_client.decode_token(access_token)
- # access token has expired or is invalid
- except Exception:
- # get a new access token from authentication provider
- oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
- # decode access token
- decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
-
- # create OIDCUser from decoded token
- user = _oidc_user_from_decoded_token(decoded_token)
-
- # get authentication init datetime
- auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
- exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
-
- # compute OIDC tokens expiration date
- oidc_profile["expires_at"] = exp_datetime
- oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
- seconds=oidc_profile["refresh_expires_in"]
- )
-
- # add OIDC profile data to custom User proxy model
- for key, val in oidc_profile.items():
- if hasattr(user, key):
- setattr(user, key, val)
-
- # put OIDC profile in cache or update it after token renewal
- cache_key = f"oidc_user_{user.id}"
- if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]:
- # set cache key TTL as refresh token expiration time
- assert user.refresh_expires_at
- ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp())
-
- # save oidc_profile in cache
- cache.set(cache_key, oidc_profile, timeout=max(0, ttl))
-
- return user
-
-
-class OIDCAuthorizationCodePKCEBackend:
- def authenticate(
- self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str
- ) -> Optional[OIDCUser]:
-
- user = None
- try:
- # try to authenticate user with OIDC PKCE authorization code flow
- oidc_profile = get_oidc_client().authorization_code(
- code, redirect_uri, code_verifier=code_verifier
- )
-
- # create Django user
- user = _oidc_user_from_profile(oidc_profile)
-
- except Exception as e:
- sentry_sdk.capture_exception(e)
-
- return user
-
- def get_user(self, user_id: int) -> Optional[OIDCUser]:
- # get oidc profile from cache
- oidc_profile = cache.get(f"oidc_user_{user_id}")
- if oidc_profile:
- try:
- user = _oidc_user_from_profile(oidc_profile)
- # restore auth backend
- setattr(user, "backend", f"{__name__}.{self.__class__.__name__}")
- return user
- except Exception as e:
- sentry_sdk.capture_exception(e)
- return None
- else:
- return None
-
-
-class OIDCBearerTokenAuthentication(BaseAuthentication):
- def authenticate(self, request):
- auth_header = request.META.get("HTTP_AUTHORIZATION")
- if auth_header is None:
- return None
-
- try:
- auth_type, refresh_token = auth_header.split(" ", 1)
- except ValueError:
- raise AuthenticationFailed("Invalid HTTP authorization header format")
-
- if auth_type != "Bearer":
- raise AuthenticationFailed(
- (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).")
- )
- try:
-
- oidc_client = get_oidc_client()
-
- # compute a cache key from the token that does not exceed
- # memcached key size limit
- hasher = hashlib.sha1()
- hasher.update(refresh_token.encode("ascii"))
- cache_key = f"api_token_{hasher.hexdigest()}"
-
- # check if an access token is cached
- access_token = cache.get(cache_key)
-
- # attempt to decode access token
- try:
- decoded_token = oidc_client.decode_token(access_token)
- except Exception:
- # access token is None or it has expired
- decoded_token = None
-
- if access_token is None or decoded_token is None:
- # get a new access token from authentication provider
- access_token = oidc_client.refresh_token(refresh_token)["access_token"]
- # decode access token
- decoded_token = oidc_client.decode_token(access_token)
- # compute access token expiration
- exp = datetime.fromtimestamp(decoded_token["exp"])
- ttl = int(exp.timestamp() - timezone.now().timestamp())
- # save access token in cache while it is valid
- cache.set(cache_key, access_token, timeout=max(0, ttl))
-
- # create Django user
- user = _oidc_user_from_decoded_token(decoded_token)
- except UnicodeEncodeError as e:
- sentry_sdk.capture_exception(e)
- raise ValidationError("Invalid bearer token")
- except Exception as e:
- sentry_sdk.capture_exception(e)
- raise AuthenticationFailed(str(e))
-
- return user, None
diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py
deleted file mode 100644
--- a/swh/web/auth/middlewares.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from django.contrib.auth import BACKEND_SESSION_KEY
-from django.http.response import HttpResponseRedirect
-
-from swh.web.common.utils import reverse
-
-
-class OIDCSessionExpiredMiddleware:
- """
- Middleware for checking OIDC user session expiration.
- """
-
- def __init__(self, get_response=None):
- self.get_response = get_response
- self.exempted_urls = [
- reverse(v)
- for v in ("logout", "oidc-login", "oidc-login-complete", "oidc-logout")
- ]
-
- def __call__(self, request):
- if (
- request.method != "GET"
- or request.user.is_authenticated
- or BACKEND_SESSION_KEY not in request.session
- or "OIDC" not in request.session[BACKEND_SESSION_KEY]
- or request.path in self.exempted_urls
- ):
- return self.get_response(request)
-
- # At that point, we know that a OIDC user was previously logged in
- # and his session has expired.
- # User will be redirected to logout page and a link will be offered to
- # login again.
- next_path = request.get_full_path()
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
--- a/swh/web/auth/utils.py
+++ b/swh/web/auth/utils.py
@@ -4,45 +4,12 @@
# See top-level LICENSE file for more information
from base64 import urlsafe_b64encode
-import hashlib
-import secrets
-from typing import Dict, Tuple
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
-from swh.auth.keycloak import KeycloakOpenIDConnect
-from swh.web.config import get_config
-
-
-def gen_oidc_pkce_codes() -> Tuple[str, str]:
- """
- Generates a code verifier and a code challenge to be used
- with the OpenID Connect authorization code flow with PKCE
- ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636).
-
- PKCE replaces the static secret used in the standard authorization
- code flow with a temporary one-time challenge, making it feasible
- to use in public clients.
-
- The implementation is inspired from that blog post:
- https://www.stefaanlippens.net/oauth-code-flow-pkce.html
- """
- # generate a code verifier which is a long enough random alphanumeric
- # string, only to be used "client side"
- code_verifier_str = secrets.token_urlsafe(60)
-
- # create the PKCE code challenge by hashing the code verifier with SHA256
- # and encoding the result in URL-safe base64 (without padding)
- code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest()
- code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii")
- code_challenge_str = code_challenge_str.replace("=", "")
-
- return code_verifier_str, code_challenge_str
-
-
OIDC_SWH_WEB_CLIENT_ID = "swh-web"
@@ -101,28 +68,3 @@
The decrypted data
"""
return _get_fernet(password, salt).decrypt(data)
-
-
-# stores instances of KeycloakOpenIDConnect class
-# dict keys are (realm_name, client_id) tuples
-_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {}
-
-
-def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect:
- """
- Instantiate a KeycloakOpenIDConnect class for a given client in the
- SoftwareHeritage realm.
-
- Args:
- client_id: client identifier in the SoftwareHeritage realm
-
- Returns:
- An object to ease the interaction with the Keycloak server
- """
- keycloak_config = get_config()["keycloak"]
-
- if client_id not in _keycloak_oidc:
- _keycloak_oidc[client_id] = KeycloakOpenIDConnect(
- keycloak_config["server_url"], keycloak_config["realm_name"], client_id
- )
- return _keycloak_oidc[client_id]
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -5,14 +5,11 @@
import json
from typing import Any, Dict, cast
-import uuid
from cryptography.fernet import InvalidToken
from django.conf.urls import url
-from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.decorators import login_required
-from django.core.cache import cache
from django.core.paginator import Paginator
from django.http import HttpRequest
from django.http.response import (
@@ -25,126 +22,21 @@
from django.views.decorators.http import require_http_methods
from swh.auth.django.models import OIDCUser
+from swh.auth.django.utils import keycloak_oidc_client
+from swh.auth.django.views import get_oidc_login_data, oidc_login_view
+from swh.auth.django.views import urlpatterns as auth_urlpatterns
from swh.web.auth.models import OIDCUserOfflineTokens
-from swh.web.auth.utils import (
- decrypt_data,
- encrypt_data,
- gen_oidc_pkce_codes,
- get_oidc_client,
-)
-from swh.web.common.exc import BadInputExc, ForbiddenExc
+from swh.web.auth.utils import decrypt_data, encrypt_data
+from swh.web.common.exc import ForbiddenExc
from swh.web.common.utils import reverse
from swh.web.config import get_config
-def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"):
- # generate a CSRF token
- state = str(uuid.uuid4())
-
- code_verifier, code_challenge = gen_oidc_pkce_codes()
-
- request.session["login_data"] = {
- "code_verifier": code_verifier,
- "state": state,
- "redirect_uri": redirect_uri,
- "next_path": request.GET.get("next_path", ""),
- }
-
- authorization_url_params = {
- "state": state,
- "code_challenge": code_challenge,
- "code_challenge_method": "S256",
- "scope": scope,
- }
-
- oidc_client = get_oidc_client()
- authorization_url = oidc_client.authorization_url(
- redirect_uri, **authorization_url_params
- )
-
- return HttpResponseRedirect(authorization_url)
-
-
-def oidc_login(request: HttpRequest) -> HttpResponse:
- """
- Django view to initiate login process using OpenID Connect.
- """
-
- redirect_uri = reverse("oidc-login-complete", request=request)
-
- return _oidc_login(request, redirect_uri=redirect_uri)
-
-
-def _get_login_data(request: HttpRequest) -> Dict[str, Any]:
- if "login_data" not in request.session:
- raise Exception("Login process has not been initialized.")
-
- return request.session["login_data"]
-
-
-def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]):
-
- if "code" not in request.GET or "state" not in request.GET:
- raise BadInputExc("Missing query parameters for authentication.")
-
- # get CSRF token returned by OIDC server
- state = request.GET["state"]
-
- if state != login_data["state"]:
- raise BadInputExc("Wrong CSRF token, aborting login process.")
-
-
-def oidc_login_complete(request: HttpRequest) -> HttpResponse:
- """
- Django view to finalize login process using OpenID Connect.
- """
- login_data = _get_login_data(request)
- next_path = login_data["next_path"] or request.build_absolute_uri("/")
-
- if "error" in request.GET:
- raise Exception(request.GET["error"])
-
- _check_login_data(request, login_data)
-
- user = authenticate(
- request=request,
- code=request.GET["code"],
- code_verifier=login_data["code_verifier"],
- redirect_uri=login_data["redirect_uri"],
- )
-
- if user is None:
- raise Exception("User authentication failed.")
-
- login(request, user)
-
- return HttpResponseRedirect(next_path)
-
-
-def oidc_logout(request: HttpRequest) -> HttpResponse:
- """
- Django view to logout using OpenID Connect.
- """
- user = request.user
- logout(request)
- if hasattr(user, "refresh_token"):
- oidc_client = get_oidc_client()
- user = cast(OIDCUser, user)
- refresh_token = cast(str, user.refresh_token)
- # end OpenID Connect session
- oidc_client.logout(refresh_token)
- # remove user data from cache
- cache.delete(f"oidc_user_{user.id}")
-
- logout_url = reverse("logout", query_params={"remote_user": 1})
- return HttpResponseRedirect(request.build_absolute_uri(logout_url))
-
-
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
- return _oidc_login(
+ return oidc_login_view(
request, redirect_uri=redirect_uri, scope="openid offline_access"
)
@@ -155,9 +47,8 @@
if "error" in request.GET:
raise Exception(request.GET["error"])
- oidc_client = get_oidc_client()
- login_data = _get_login_data(request)
- _check_login_data(request, login_data)
+ login_data = get_oidc_login_data(request)
+ oidc_client = keycloak_oidc_client()
oidc_profile = oidc_client.authorization_code(
code=request.GET["code"],
code_verifier=login_data["code_verifier"],
@@ -227,7 +118,7 @@
secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
- oidc_client = get_oidc_client()
+ oidc_client = keycloak_oidc_client()
oidc_client.logout(decrypted_token.decode("ascii"))
token_data.delete()
return HttpResponse(status=200)
@@ -240,10 +131,7 @@
return render(request, "auth/profile.html")
-urlpatterns = [
- url(r"^oidc/login/$", oidc_login, name="oidc-login"),
- url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"),
- url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"),
+urlpatterns = auth_urlpatterns + [
url(
r"^oidc/generate-bearer-token/$",
oidc_generate_bearer_token,
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -12,6 +12,7 @@
import sys
from typing import Any, Dict
+from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.config import get_config
swh_web_config = get_config()
@@ -57,7 +58,7 @@
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
- "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware",
+ "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"swh.web.common.middlewares.ThrottlingHeadersMiddleware",
@@ -166,7 +167,7 @@
"DEFAULT_THROTTLE_RATES": throttle_rates,
"DEFAULT_AUTHENTICATION_CLASSES": [
"rest_framework.authentication.SessionAuthentication",
- "swh.web.auth.backends.OIDCBearerTokenAuthentication",
+ "swh.auth.django.backends.OIDCBearerTokenAuthentication",
],
"EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler",
}
@@ -278,5 +279,10 @@
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
- "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend",
+ "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend",
]
+
+SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"]
+SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"]
+SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID
+SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout"
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -118,7 +118,7 @@
Logged in as
{% if 'OIDC' in user.backend %}
<a href="{% url 'oidc-profile' %}"><strong>{{ user.username }}</strong></a>,
- <a href="{% url 'oidc-logout' %}">logout</a>
+ <a href="{% url 'oidc-logout' %}?next_path={% url 'logout' %}?remote_user=1">logout</a>
{% else %}
<strong>{{ user.username }}</strong>,
<a href="{{ logout_url }}">logout</a>
diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py
--- a/swh/web/tests/api/views/test_graph.py
+++ b/swh/web/tests/api/views/test_graph.py
@@ -38,21 +38,21 @@
check_http_get_response(api_client, url, status_code=401)
-def _authenticate_graph_user(api_client, keycloak_mock):
- keycloak_mock.user_permissions = [API_GRAPH_PERM]
- oidc_profile = keycloak_mock.login()
+def _authenticate_graph_user(api_client, keycloak_oidc):
+ keycloak_oidc.user_permissions = [API_GRAPH_PERM]
+ oidc_profile = keycloak_oidc.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
-def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock):
+def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock):
graph_query = "stats"
url = reverse("api-1-graph", url_args={"graph_query": graph_query})
- oidc_profile = keycloak_mock.login()
+ oidc_profile = keycloak_oidc.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
check_http_get_response(api_client, url, status_code=403)
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
requests_mock.get(
get_config()["graph"]["server_url"] + graph_query,
json={},
@@ -61,8 +61,8 @@
check_http_get_response(api_client, url, status_code=200)
-def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323"
@@ -114,8 +114,8 @@
}
-def test_graph_json_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_json_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "stats"
@@ -132,8 +132,8 @@
assert resp.content == json.dumps(_response_json).encode()
-def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb"
@@ -167,7 +167,7 @@
@given(origin())
def test_graph_response_resolve_origins(
- archive_data, api_client, keycloak_mock, requests_mock, origin
+ archive_data, api_client, keycloak_oidc, requests_mock, origin
):
hasher = hashlib.sha1()
hasher.update(origin["url"].encode())
@@ -182,7 +182,7 @@
)
)
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
for graph_query, response_text, content_type in (
(
@@ -238,9 +238,9 @@
def test_graph_response_resolve_origins_nothing_to_do(
- api_client, keycloak_mock, requests_mock
+ api_client, keycloak_oidc, requests_mock
):
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "stats"
diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_api_auth.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import pytest
-
-from django.contrib.auth.models import AnonymousUser, User
-
-from swh.auth.django.models import OIDCUser
-from swh.web.common.utils import reverse
-from swh.web.tests.utils import check_api_get_responses, check_http_get_response
-
-
-@pytest.mark.django_db
-def test_drf_django_session_auth_success(keycloak_mock, client):
- """
- Check user gets authenticated when querying the web api
- through a web browser.
- """
- url = reverse("api-1-stat-counters")
-
- client.login(code="", code_verifier="", redirect_uri="")
-
- response = check_http_get_response(client, url, status_code=200)
- request = response.wsgi_request
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remoter used has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client):
- """
- Check user gets authenticated when querying the web api
- through an HTTP client using bearer token authentication.
- """
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=200)
- request = response.wsgi_request
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remoter used has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client):
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # check for failed authentication but with expected token format
- keycloak_mock.set_auth_success(False)
- api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- # check for failed authentication when token format is invalid
- api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà")
-
- response = check_api_get_responses(api_client, url, status_code=400)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
-
-def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client):
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # missing authorization type
- api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- # invalid authorization type
- api_client.credentials(HTTP_AUTHORIZATION="Foo token")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_backends.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from datetime import datetime, timedelta
-from unittest.mock import Mock
-
-import pytest
-
-from django.conf import settings
-from django.contrib.auth import authenticate, get_backends
-from rest_framework.exceptions import AuthenticationFailed
-
-from swh.auth.django.models import OIDCUser
-from swh.web.auth.backends import OIDCBearerTokenAuthentication
-from swh.web.common.utils import reverse
-
-
-def _authenticate_user(request_factory):
- request = request_factory.get(reverse("oidc-login-complete"))
-
- return authenticate(
- request=request,
- code="some-code",
- code_verifier="some-code-verifier",
- redirect_uri="https://localhost:5004",
- )
-
-
-def _check_authenticated_user(user, decoded_token, kc_oidc_mock):
- assert user is not None
- assert isinstance(user, OIDCUser)
- assert user.id != 0
- assert user.username == decoded_token["preferred_username"]
- assert user.password == ""
- assert user.first_name == decoded_token["given_name"]
- assert user.last_name == decoded_token["family_name"]
- assert user.email == decoded_token["email"]
- assert user.is_staff == ("/staff" in decoded_token["groups"])
- assert user.sub == decoded_token["sub"]
- resource_access = decoded_token.get("resource_access", {})
- resource_access_client = resource_access.get(kc_oidc_mock, {})
- assert user.permissions == set(resource_access_client.get("roles", []))
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory):
- """
- Checks successful login based on OpenID Connect with PKCE extension
- Django authentication backend (login from Web UI).
- """
- keycloak_mock.user_groups = ["/staff"]
-
- oidc_profile = keycloak_mock.login()
- user = _authenticate_user(request_factory)
-
- decoded_token = keycloak_mock.decode_token(user.access_token)
- _check_authenticated_user(user, decoded_token, keycloak_mock)
-
- auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
- exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
- refresh_exp_datetime = auth_datetime + timedelta(
- seconds=oidc_profile["refresh_expires_in"]
- )
-
- assert user.access_token == oidc_profile["access_token"]
- assert user.expires_at == exp_datetime
- assert user.id_token == oidc_profile["id_token"]
- assert user.refresh_token == oidc_profile["refresh_token"]
- assert user.refresh_expires_at == refresh_exp_datetime
- assert user.scope == oidc_profile["scope"]
- assert user.session_state == oidc_profile["session_state"]
-
- backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend"
- assert user.backend == backend_path
- backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
- assert get_backends()[backend_idx].get_user(user.id) == user
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory):
- """
- Checks failed login based on OpenID Connect with PKCE extension Django
- authentication backend (login from Web UI).
- """
- keycloak_mock.set_auth_success(False)
-
- user = _authenticate_user(request_factory)
-
- assert user is None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_success(
- keycloak_mock, request_factory
-):
- """
- Checks access token renewal success using refresh token.
- """
-
- oidc_profile = keycloak_mock.login()
- decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"])
- new_access_token = "new_access_token"
-
- def _refresh_token(refresh_token):
- oidc_profile = dict(keycloak_mock.login())
- oidc_profile["access_token"] = new_access_token
- return oidc_profile
-
- def _decode_token(access_token):
- if access_token != new_access_token:
- raise Exception("access token token has expired")
- else:
- return decoded_token
-
- keycloak_mock.decode_token = Mock()
- keycloak_mock.decode_token.side_effect = _decode_token
- keycloak_mock.refresh_token.side_effect = _refresh_token
-
- user = _authenticate_user(request_factory)
-
- oidc_profile = keycloak_mock.login()
- keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
-
- assert user is not None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_failure(
- keycloak_mock, request_factory
-):
- """
- Checks access token renewal failure using refresh token.
- """
-
- def _refresh_token(refresh_token):
- raise Exception("OIDC session has expired")
-
- def _decode_token(access_token):
- raise Exception("access token token has expired")
-
- keycloak_mock.decode_token = Mock()
- keycloak_mock.decode_token.side_effect = _decode_token
- keycloak_mock.refresh_token.side_effect = _refresh_token
-
- user = _authenticate_user(request_factory)
-
- oidc_profile = keycloak_mock.login()
- keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
-
- assert user is None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory):
- """
- Checks that a permission defined with OpenID Connect is correctly mapped
- to a Django one when logging from Web UI.
- """
- permission = "webapp.some-permission"
- keycloak_mock.user_permissions = [permission]
- user = _authenticate_user(request_factory)
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
- assert user.has_module_perms("webapp")
- assert not user.has_module_perms("foo")
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory):
- """
- Checks successful login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login).
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
- access_token = oidc_profile["access_token"]
-
- decoded_token = keycloak_mock.decode_token(access_token)
-
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user, decoded_token, keycloak_mock)
- # oidc_profile is not filled when authenticating through bearer token
- assert hasattr(user, "access_token") and user.access_token is None
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory):
- """
- Checks failed login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login).
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
-
- # simulate a failed authentication with a bearer token in expected format
- keycloak_mock.set_auth_success(False)
-
- refresh_token = oidc_profile["refresh_token"]
-
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
- # simulate a failed authentication with an invalid bearer token format
- request = api_request_factory.get(
- url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
- )
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
-
-def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory):
- """
- Checks failed login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login) due to invalid
- authorization header value.
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # Invalid authorization type
- request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
- # Missing authorization type
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_permissions(
- keycloak_mock, api_request_factory
-):
- """
- Checks that a permission defined with OpenID Connect is correctly mapped
- to a Django one when using bearer token authentication.
- """
- permission = "webapp.some-permission"
- keycloak_mock.user_permissions = [permission]
-
- drf_auth_backend = OIDCBearerTokenAuthentication()
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
- url = reverse("api-1-stat-counters")
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
- user, _ = drf_auth_backend.authenticate(request)
-
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
- assert user.has_module_perms("webapp")
- assert not user.has_module_perms("foo")
diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_middlewares.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-import pytest
-
-from django.core.cache import cache
-from django.test import modify_settings
-
-from swh.web.common.utils import reverse
-from swh.web.tests.utils import check_html_get_response
-
-
-@pytest.mark.django_db
-@modify_settings(
- MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]}
-)
-def test_oidc_session_expired_middleware_disabled(client, keycloak_mock):
- # authenticate user
-
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- url = reverse("swh-web-homepage")
-
- # visit url first to get user from response
- response = check_html_get_response(client, url, status_code=200)
-
- # simulate OIDC session expiration
- cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
-
- # no redirection when session has expired
- check_html_get_response(client, url, status_code=200)
-
-
-@pytest.mark.django_db
-def test_oidc_session_expired_middleware_enabled(client, keycloak_mock):
- # authenticate user
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- url = reverse("swh-web-homepage")
-
- # visit url first to get user from response
- response = check_html_get_response(client, url, status_code=200)
-
- # simulate OIDC session expiration
- cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
-
- # should redirect to logout page
- resp = check_html_get_response(client, url, status_code=302)
- silent_refresh_url = reverse(
- "logout", query_params={"next_path": url, "remote_user": 1}
- )
- assert resp["location"] == silent_refresh_url
diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py
--- a/swh/web/tests/auth/test_utils.py
+++ b/swh/web/tests/auth/test_utils.py
@@ -3,40 +3,11 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from base64 import urlsafe_b64encode
-import hashlib
-import re
from cryptography.fernet import InvalidToken
import pytest
-from swh.web.auth.utils import decrypt_data, encrypt_data, gen_oidc_pkce_codes
-
-
-def test_gen_oidc_pkce_codes():
- """
- Check generated PKCE codes respect the specification
- (see https://tools.ietf.org/html/rfc7636#section-4.1)
- """
- code_verifier, code_challenge = gen_oidc_pkce_codes()
-
- # check the code verifier only contains allowed characters
- assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier)
-
- # check minimum and maximum authorized length for the
- # code verifier
- assert len(code_verifier) >= 43
- assert len(code_verifier) <= 128
-
- # compute code challenge from code verifier
- challenge = hashlib.sha256(code_verifier.encode("ascii")).digest()
- challenge = urlsafe_b64encode(challenge).decode("ascii")
- challenge = challenge.replace("=", "")
-
- # check base64 padding is not present
- assert not code_challenge[-1].endswith("=")
- # check code challenge is valid
- assert code_challenge == challenge
+from swh.web.auth.utils import decrypt_data, encrypt_data
def test_encrypt_decrypt_data_ok():
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -9,10 +9,8 @@
import pytest
-from django.contrib.auth.models import AnonymousUser, User
from django.http import QueryDict
-from swh.auth.django.models import OIDCUser
from swh.web.auth.models import OIDCUserOfflineTokens
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data
from swh.web.common.utils import reverse
@@ -27,11 +25,11 @@
def _check_oidc_login_code_flow_data(
- request, response, kc_oidc_mock, redirect_uri, scope="openid"
+ request, response, keycloak_oidc, redirect_uri, scope="openid"
):
parsed_url = urlparse(response["location"])
- authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
+ authorization_url = keycloak_oidc.well_known()["authorization_endpoint"]
query_dict = QueryDict(parsed_url.query)
# check redirect url is valid
@@ -59,233 +57,6 @@
return login_data
-@pytest.mark.django_db
-def test_oidc_login_views_success(client, keycloak_mock):
- """
- Simulate a successful login authentication with OpenID Connect
- authorization code flow with PKCE.
- """
- # user initiates login process
- login_url = reverse("oidc-login")
-
- # should redirect to Keycloak authentication page in order
- # for a user to login with its username / password
- response = check_html_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- login_data = _check_oidc_login_code_flow_data(
- request,
- response,
- keycloak_mock,
- redirect_uri=reverse("oidc-login-complete", request=request),
- )
-
- # once a user has identified himself in Keycloak, he is
- # redirected to the 'oidc-login-complete' view to
- # login in Django.
-
- # generate authorization code / session state in the same
- # manner as Keycloak
- code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
- session_state = str(uuid.uuid4())
-
- login_complete_url = reverse(
- "oidc-login-complete",
- query_params={
- "code": code,
- "state": login_data["state"],
- "session_state": session_state,
- },
- )
-
- # login process finalization, should redirect to root url by default
- response = check_html_get_response(client, login_complete_url, status_code=302)
- request = response.wsgi_request
-
- assert response["location"] == request.build_absolute_uri("/")
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remote user has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_oidc_logout_view_success(client, keycloak_mock):
- """
- Simulate a successful logout operation with OpenID Connect.
- """
- # login our test user
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- # user initiates logout
- oidc_logout_url = reverse("oidc-logout")
-
- # should redirect to logout page
- response = check_html_get_response(client, oidc_logout_url, status_code=302)
- request = response.wsgi_request
-
- logout_url = reverse("logout", query_params={"remote_user": 1})
- assert response["location"] == request.build_absolute_uri(logout_url)
-
- # should have been logged out in Keycloak
- oidc_profile = keycloak_mock.login()
- keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"])
-
- # check effective logout in Django
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_login_view_failure(client, keycloak_mock):
- """
- Simulate a failed authentication with OpenID Connect.
- """
- keycloak_mock.set_auth_success(False)
-
- # user initiates login process
- login_url = reverse("oidc-login")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
-
- # no users should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-# Simulate possible errors with OpenID Connect in the login complete view.
-
-
-def test_oidc_login_complete_view_no_login_data(client, mocker):
- # user initiates login process
- login_url = reverse("oidc-login-complete")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
-
- assert_contains(
- response, "Login process has not been initialized.", status_code=500
- )
-
-
-def test_oidc_login_complete_view_missing_parameters(client, mocker):
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # user initiates login process
- login_url = reverse("oidc-login-complete")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=400, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(
- response, "Missing query parameters for authentication.", status_code=400
- )
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock):
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # user initiates login process
- login_url = reverse(
- "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"}
- )
-
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=400, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(
- response, "Wrong CSRF token, aborting login process.", status_code=400
- )
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock):
- keycloak_mock.set_auth_success(False)
-
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # check authentication error is reported
- login_url = reverse(
- "oidc-login-complete",
- query_params={"code": "some-code", "state": session["login_data"]["state"]},
- )
-
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(response, "User authentication failed.", status_code=500)
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_logout_view_failure(client, keycloak_mock):
- """
- Simulate a failed logout operation with OpenID Connect.
- """
- # login our test user
- client.login(code="", code_verifier="", redirect_uri="")
-
- err_msg = "Authentication server error"
- keycloak_mock.logout.side_effect = Exception(err_msg)
-
- # user initiates logout process
- logout_url = reverse("oidc-logout")
- # should render an error page
- response = check_html_get_response(
- client, logout_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(response, err_msg, status_code=500)
-
- # user should be logged out from Django anyway
- assert isinstance(request.user, AnonymousUser)
-
-
def test_view_rendering_when_user_not_set_in_request(request_factory):
request = request_factory.get("/")
# Django RequestFactory do not set any user by default
@@ -341,7 +112,7 @@
)
nb_tokens = len(OIDCUserOfflineTokens.objects.all())
- response = check_html_get_response(client, token_complete_url, status_code=302)
+ response = check_http_get_response(client, token_complete_url, status_code=302)
request = response.wsgi_request
# check token has been generated and saved encrypted to database
@@ -360,12 +131,12 @@
@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock):
+def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc):
"""
Authenticated user should be able to generate a bearer token using OIDC
Authorization Code Flow.
"""
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
def test_oidc_list_bearer_tokens_anonymous_user(client):
@@ -379,14 +150,14 @@
@pytest.mark.django_db
-def test_oidc_list_bearer_tokens(client, keycloak_mock):
+def test_oidc_list_bearer_tokens(client, keycloak_oidc):
"""
User with correct credentials should be allowed to list his tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
@@ -411,14 +182,14 @@
@pytest.mark.django_db
-def test_oidc_get_bearer_token(client, keycloak_mock):
+def test_oidc_get_bearer_token(client, keycloak_oidc):
"""
User with correct credentials should be allowed to display a token.
"""
nb_tokens = 3
for i in range(nb_tokens):
- token = _generate_and_test_bearer_token(client, keycloak_mock)
+ token = _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse("oidc-get-bearer-token")
@@ -441,14 +212,14 @@
@pytest.mark.django_db
-def test_oidc_revoke_bearer_tokens(client, keycloak_mock):
+def test_oidc_revoke_bearer_tokens(client, keycloak_oidc):
"""
User with correct credentials should be allowed to revoke tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse("oidc-revoke-bearer-tokens")
@@ -470,12 +241,12 @@
"""
url = reverse("oidc-profile")
login_url = reverse("oidc-login", query_params={"next_path": url})
- resp = check_html_get_response(client, url, status_code=302)
+ resp = check_http_get_response(client, url, status_code=302)
assert resp["location"] == login_url
@pytest.mark.django_db
-def test_oidc_profile_view(client, keycloak_mock):
+def test_oidc_profile_view(client, keycloak_oidc):
"""
Authenticated users should be able to request the profile page
and link to Keycloak account UI should be present.
@@ -483,7 +254,7 @@
url = reverse("oidc-profile")
kc_config = get_config()["keycloak"]
user_permissions = ["perm1", "perm2"]
- keycloak_mock.user_permissions = user_permissions
+ keycloak_oidc.user_permissions = user_permissions
client.login(code="", code_verifier="", redirect_uri="")
resp = check_html_get_response(
client, url, status_code=200, template_used="auth/profile.html"
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -18,7 +18,6 @@
from django.core.cache import cache
from rest_framework.test import APIClient, APIRequestFactory
-from swh.auth.pytest_plugin import keycloak_mock_factory
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
@@ -28,6 +27,8 @@
from swh.web.config import get_config
from swh.web.tests.data import get_tests_data, override_storages
+pytest_plugins = ["swh.auth.pytest_plugin"]
+
# Used to skip some tests
ctags_json_missing = (
shutil.which("ctags") is None
@@ -361,22 +362,15 @@
yield converters.from_swh(ctag, hashess={"id"})
-_keycloak_config = get_config()["keycloak"]
-
-_keycloak_mock = keycloak_mock_factory(
- server_url=_keycloak_config["server_url"],
- realm_name=_keycloak_config["realm_name"],
- client_id=OIDC_SWH_WEB_CLIENT_ID,
-)
+@pytest.fixture
+def keycloak_oidc(keycloak_oidc, mocker):
+ keycloak_config = get_config()["keycloak"]
+ keycloak_oidc.server_url = keycloak_config["server_url"]
+ keycloak_oidc.realm_name = keycloak_config["realm_name"]
+ keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID
-@pytest.fixture
-def keycloak_mock(_keycloak_mock, mocker):
- for oidc_client_factory in (
- "swh.web.auth.views.get_oidc_client",
- "swh.web.auth.backends.get_oidc_client",
- ):
- mock_get_oidc_client = mocker.patch(oidc_client_factory)
- mock_get_oidc_client.return_value = _keycloak_mock
+ keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client")
+ keycloak_oidc_client.return_value = keycloak_oidc
- return _keycloak_mock
+ return keycloak_oidc
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 11:09 AM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228087
Attached To
D5367: auth: Use generic Django authentication backends from swh-auth
Event Timeline
Log In to Comment