diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,4 +1,4 @@
-swh.auth[django] >= 0.3.7
+swh.auth[django] >= 0.5.0
swh.core >= 0.0.95
swh.indexer >= 0.4.1
swh.model >= 0.5.0
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -16,7 +16,6 @@
prometheus-client
pybadges
pygments
-python-keycloak >= 0.19.0
python-magic >= 0.4.0
python-memcached
pyyaml
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
deleted file mode 100644
--- a/swh/web/auth/backends.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from datetime import datetime, timedelta
-import hashlib
-from typing import Any, Dict, Optional
-
-import sentry_sdk
-
-from django.core.cache import cache
-from django.http import HttpRequest
-from django.utils import timezone
-from rest_framework.authentication import BaseAuthentication
-from rest_framework.exceptions import AuthenticationFailed, ValidationError
-
-from swh.auth.django.models import OIDCUser
-from swh.web.auth.utils import get_oidc_client
-
-
-def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
- # compute an integer user identifier for Django User model
- # by concatenating all groups of the UUID4 user identifier
- # generated by Keycloak and converting it from hex to decimal
- user_id = int("".join(decoded_token["sub"].split("-")), 16)
-
- # create a Django user that will not be saved to database
- user = OIDCUser(
- id=user_id,
- username=decoded_token["preferred_username"],
- password="",
- first_name=decoded_token["given_name"],
- last_name=decoded_token["family_name"],
- email=decoded_token["email"],
- )
-
- # set is_staff user property based on groups
- if "groups" in decoded_token:
- user.is_staff = "/staff" in decoded_token["groups"]
-
- # extract user permissions if any
- resource_access = decoded_token.get("resource_access", {})
- client_resource_access = resource_access.get(get_oidc_client().client_id, {})
- user.permissions = set(client_resource_access.get("roles", []))
-
- # add user sub to custom User proxy model
- user.sub = decoded_token["sub"]
-
- return user
-
-
-def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
-
- oidc_client = get_oidc_client()
-
- # decode JWT token
- try:
- access_token = oidc_profile["access_token"]
- decoded_token = oidc_client.decode_token(access_token)
- # access token has expired or is invalid
- except Exception:
- # get a new access token from authentication provider
- oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"])
- # decode access token
- decoded_token = oidc_client.decode_token(oidc_profile["access_token"])
-
- # create OIDCUser from decoded token
- user = _oidc_user_from_decoded_token(decoded_token)
-
- # get authentication init datetime
- auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
- exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
-
- # compute OIDC tokens expiration date
- oidc_profile["expires_at"] = exp_datetime
- oidc_profile["refresh_expires_at"] = auth_datetime + timedelta(
- seconds=oidc_profile["refresh_expires_in"]
- )
-
- # add OIDC profile data to custom User proxy model
- for key, val in oidc_profile.items():
- if hasattr(user, key):
- setattr(user, key, val)
-
- # put OIDC profile in cache or update it after token renewal
- cache_key = f"oidc_user_{user.id}"
- if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]:
- # set cache key TTL as refresh token expiration time
- assert user.refresh_expires_at
- ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp())
-
- # save oidc_profile in cache
- cache.set(cache_key, oidc_profile, timeout=max(0, ttl))
-
- return user
-
-
-class OIDCAuthorizationCodePKCEBackend:
- def authenticate(
- self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str
- ) -> Optional[OIDCUser]:
-
- user = None
- try:
- # try to authenticate user with OIDC PKCE authorization code flow
- oidc_profile = get_oidc_client().authorization_code(
- code, redirect_uri, code_verifier=code_verifier
- )
-
- # create Django user
- user = _oidc_user_from_profile(oidc_profile)
-
- except Exception as e:
- sentry_sdk.capture_exception(e)
-
- return user
-
- def get_user(self, user_id: int) -> Optional[OIDCUser]:
- # get oidc profile from cache
- oidc_profile = cache.get(f"oidc_user_{user_id}")
- if oidc_profile:
- try:
- user = _oidc_user_from_profile(oidc_profile)
- # restore auth backend
- setattr(user, "backend", f"{__name__}.{self.__class__.__name__}")
- return user
- except Exception as e:
- sentry_sdk.capture_exception(e)
- return None
- else:
- return None
-
-
-class OIDCBearerTokenAuthentication(BaseAuthentication):
- def authenticate(self, request):
- auth_header = request.META.get("HTTP_AUTHORIZATION")
- if auth_header is None:
- return None
-
- try:
- auth_type, refresh_token = auth_header.split(" ", 1)
- except ValueError:
- raise AuthenticationFailed("Invalid HTTP authorization header format")
-
- if auth_type != "Bearer":
- raise AuthenticationFailed(
- (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).")
- )
- try:
-
- oidc_client = get_oidc_client()
-
- # compute a cache key from the token that does not exceed
- # memcached key size limit
- hasher = hashlib.sha1()
- hasher.update(refresh_token.encode("ascii"))
- cache_key = f"api_token_{hasher.hexdigest()}"
-
- # check if an access token is cached
- access_token = cache.get(cache_key)
-
- # attempt to decode access token
- try:
- decoded_token = oidc_client.decode_token(access_token)
- except Exception:
- # access token is None or it has expired
- decoded_token = None
-
- if access_token is None or decoded_token is None:
- # get a new access token from authentication provider
- access_token = oidc_client.refresh_token(refresh_token)["access_token"]
- # decode access token
- decoded_token = oidc_client.decode_token(access_token)
- # compute access token expiration
- exp = datetime.fromtimestamp(decoded_token["exp"])
- ttl = int(exp.timestamp() - timezone.now().timestamp())
- # save access token in cache while it is valid
- cache.set(cache_key, access_token, timeout=max(0, ttl))
-
- # create Django user
- user = _oidc_user_from_decoded_token(decoded_token)
- except UnicodeEncodeError as e:
- sentry_sdk.capture_exception(e)
- raise ValidationError("Invalid bearer token")
- except Exception as e:
- sentry_sdk.capture_exception(e)
- raise AuthenticationFailed(str(e))
-
- return user, None
diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py
deleted file mode 100644
--- a/swh/web/auth/middlewares.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from django.contrib.auth import BACKEND_SESSION_KEY
-from django.http.response import HttpResponseRedirect
-
-from swh.web.common.utils import reverse
-
-
-class OIDCSessionExpiredMiddleware:
- """
- Middleware for checking OIDC user session expiration.
- """
-
- def __init__(self, get_response=None):
- self.get_response = get_response
- self.exempted_urls = [
- reverse(v)
- for v in ("logout", "oidc-login", "oidc-login-complete", "oidc-logout")
- ]
-
- def __call__(self, request):
- if (
- request.method != "GET"
- or request.user.is_authenticated
- or BACKEND_SESSION_KEY not in request.session
- or "OIDC" not in request.session[BACKEND_SESSION_KEY]
- or request.path in self.exempted_urls
- ):
- return self.get_response(request)
-
- # At that point, we know that a OIDC user was previously logged in
- # and his session has expired.
- # User will be redirected to logout page and a link will be offered to
- # login again.
- next_path = request.get_full_path()
- logout_url = reverse(
- "logout", query_params={"next_path": next_path, "remote_user": 1}
- )
- return HttpResponseRedirect(logout_url)
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
--- a/swh/web/auth/utils.py
+++ b/swh/web/auth/utils.py
@@ -4,45 +4,12 @@
# See top-level LICENSE file for more information
from base64 import urlsafe_b64encode
-import hashlib
-import secrets
-from typing import Dict, Tuple
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
-from swh.auth.keycloak import KeycloakOpenIDConnect
-from swh.web.config import get_config
-
-
-def gen_oidc_pkce_codes() -> Tuple[str, str]:
- """
- Generates a code verifier and a code challenge to be used
- with the OpenID Connect authorization code flow with PKCE
- ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636).
-
- PKCE replaces the static secret used in the standard authorization
- code flow with a temporary one-time challenge, making it feasible
- to use in public clients.
-
- The implementation is inspired from that blog post:
- https://www.stefaanlippens.net/oauth-code-flow-pkce.html
- """
- # generate a code verifier which is a long enough random alphanumeric
- # string, only to be used "client side"
- code_verifier_str = secrets.token_urlsafe(60)
-
- # create the PKCE code challenge by hashing the code verifier with SHA256
- # and encoding the result in URL-safe base64 (without padding)
- code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest()
- code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii")
- code_challenge_str = code_challenge_str.replace("=", "")
-
- return code_verifier_str, code_challenge_str
-
-
OIDC_SWH_WEB_CLIENT_ID = "swh-web"
@@ -101,28 +68,3 @@
The decrypted data
"""
return _get_fernet(password, salt).decrypt(data)
-
-
-# stores instances of KeycloakOpenIDConnect class
-# dict keys are (realm_name, client_id) tuples
-_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {}
-
-
-def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect:
- """
- Instantiate a KeycloakOpenIDConnect class for a given client in the
- SoftwareHeritage realm.
-
- Args:
- client_id: client identifier in the SoftwareHeritage realm
-
- Returns:
- An object to ease the interaction with the Keycloak server
- """
- keycloak_config = get_config()["keycloak"]
-
- if client_id not in _keycloak_oidc:
- _keycloak_oidc[client_id] = KeycloakOpenIDConnect(
- keycloak_config["server_url"], keycloak_config["realm_name"], client_id
- )
- return _keycloak_oidc[client_id]
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -5,14 +5,11 @@
import json
from typing import Any, Dict, cast
-import uuid
from cryptography.fernet import InvalidToken
from django.conf.urls import url
-from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.decorators import login_required
-from django.core.cache import cache
from django.core.paginator import Paginator
from django.http import HttpRequest
from django.http.response import (
@@ -25,126 +22,21 @@
from django.views.decorators.http import require_http_methods
from swh.auth.django.models import OIDCUser
+from swh.auth.django.utils import keycloak_oidc_client
+from swh.auth.django.views import get_oidc_login_data, oidc_login_view
+from swh.auth.django.views import urlpatterns as auth_urlpatterns
from swh.web.auth.models import OIDCUserOfflineTokens
-from swh.web.auth.utils import (
- decrypt_data,
- encrypt_data,
- gen_oidc_pkce_codes,
- get_oidc_client,
-)
-from swh.web.common.exc import BadInputExc, ForbiddenExc
+from swh.web.auth.utils import decrypt_data, encrypt_data
+from swh.web.common.exc import ForbiddenExc
from swh.web.common.utils import reverse
from swh.web.config import get_config
-def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"):
- # generate a CSRF token
- state = str(uuid.uuid4())
-
- code_verifier, code_challenge = gen_oidc_pkce_codes()
-
- request.session["login_data"] = {
- "code_verifier": code_verifier,
- "state": state,
- "redirect_uri": redirect_uri,
- "next_path": request.GET.get("next_path", ""),
- }
-
- authorization_url_params = {
- "state": state,
- "code_challenge": code_challenge,
- "code_challenge_method": "S256",
- "scope": scope,
- }
-
- oidc_client = get_oidc_client()
- authorization_url = oidc_client.authorization_url(
- redirect_uri, **authorization_url_params
- )
-
- return HttpResponseRedirect(authorization_url)
-
-
-def oidc_login(request: HttpRequest) -> HttpResponse:
- """
- Django view to initiate login process using OpenID Connect.
- """
-
- redirect_uri = reverse("oidc-login-complete", request=request)
-
- return _oidc_login(request, redirect_uri=redirect_uri)
-
-
-def _get_login_data(request: HttpRequest) -> Dict[str, Any]:
- if "login_data" not in request.session:
- raise Exception("Login process has not been initialized.")
-
- return request.session["login_data"]
-
-
-def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]):
-
- if "code" not in request.GET or "state" not in request.GET:
- raise BadInputExc("Missing query parameters for authentication.")
-
- # get CSRF token returned by OIDC server
- state = request.GET["state"]
-
- if state != login_data["state"]:
- raise BadInputExc("Wrong CSRF token, aborting login process.")
-
-
-def oidc_login_complete(request: HttpRequest) -> HttpResponse:
- """
- Django view to finalize login process using OpenID Connect.
- """
- login_data = _get_login_data(request)
- next_path = login_data["next_path"] or request.build_absolute_uri("/")
-
- if "error" in request.GET:
- raise Exception(request.GET["error"])
-
- _check_login_data(request, login_data)
-
- user = authenticate(
- request=request,
- code=request.GET["code"],
- code_verifier=login_data["code_verifier"],
- redirect_uri=login_data["redirect_uri"],
- )
-
- if user is None:
- raise Exception("User authentication failed.")
-
- login(request, user)
-
- return HttpResponseRedirect(next_path)
-
-
-def oidc_logout(request: HttpRequest) -> HttpResponse:
- """
- Django view to logout using OpenID Connect.
- """
- user = request.user
- logout(request)
- if hasattr(user, "refresh_token"):
- oidc_client = get_oidc_client()
- user = cast(OIDCUser, user)
- refresh_token = cast(str, user.refresh_token)
- # end OpenID Connect session
- oidc_client.logout(refresh_token)
- # remove user data from cache
- cache.delete(f"oidc_user_{user.id}")
-
- logout_url = reverse("logout", query_params={"remote_user": 1})
- return HttpResponseRedirect(request.build_absolute_uri(logout_url))
-
-
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse:
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser):
return HttpResponseForbidden()
redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request)
- return _oidc_login(
+ return oidc_login_view(
request, redirect_uri=redirect_uri, scope="openid offline_access"
)
@@ -155,9 +47,8 @@
if "error" in request.GET:
raise Exception(request.GET["error"])
- oidc_client = get_oidc_client()
- login_data = _get_login_data(request)
- _check_login_data(request, login_data)
+ login_data = get_oidc_login_data(request)
+ oidc_client = keycloak_oidc_client()
oidc_profile = oidc_client.authorization_code(
code=request.GET["code"],
code_verifier=login_data["code_verifier"],
@@ -227,7 +118,7 @@
secret = get_config()["secret_key"].encode()
salt = user.sub.encode()
decrypted_token = decrypt_data(token_data.offline_token, secret, salt)
- oidc_client = get_oidc_client()
+ oidc_client = keycloak_oidc_client()
oidc_client.logout(decrypted_token.decode("ascii"))
token_data.delete()
return HttpResponse(status=200)
@@ -240,10 +131,7 @@
return render(request, "auth/profile.html")
-urlpatterns = [
- url(r"^oidc/login/$", oidc_login, name="oidc-login"),
- url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"),
- url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"),
+urlpatterns = auth_urlpatterns + [
url(
r"^oidc/generate-bearer-token/$",
oidc_generate_bearer_token,
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -12,6 +12,7 @@
import sys
from typing import Any, Dict
+from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.config import get_config
swh_web_config = get_config()
@@ -57,7 +58,7 @@
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
- "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware",
+ "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"swh.web.common.middlewares.ThrottlingHeadersMiddleware",
@@ -166,7 +167,7 @@
"DEFAULT_THROTTLE_RATES": throttle_rates,
"DEFAULT_AUTHENTICATION_CLASSES": [
"rest_framework.authentication.SessionAuthentication",
- "swh.web.auth.backends.OIDCBearerTokenAuthentication",
+ "swh.auth.django.backends.OIDCBearerTokenAuthentication",
],
"EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler",
}
@@ -278,5 +279,10 @@
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
- "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend",
+ "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend",
]
+
+SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"]
+SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"]
+SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID
+SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout"
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -118,7 +118,7 @@
Logged in as
{% if 'OIDC' in user.backend %}
{{ user.username }},
- logout
+ logout
{% else %}
{{ user.username }},
logout
diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py
--- a/swh/web/tests/api/views/test_graph.py
+++ b/swh/web/tests/api/views/test_graph.py
@@ -38,21 +38,21 @@
check_http_get_response(api_client, url, status_code=401)
-def _authenticate_graph_user(api_client, keycloak_mock):
- keycloak_mock.user_permissions = [API_GRAPH_PERM]
- oidc_profile = keycloak_mock.login()
+def _authenticate_graph_user(api_client, keycloak_oidc):
+ keycloak_oidc.user_permissions = [API_GRAPH_PERM]
+ oidc_profile = keycloak_oidc.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
-def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock):
+def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock):
graph_query = "stats"
url = reverse("api-1-graph", url_args={"graph_query": graph_query})
- oidc_profile = keycloak_mock.login()
+ oidc_profile = keycloak_oidc.login()
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}")
check_http_get_response(api_client, url, status_code=403)
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
requests_mock.get(
get_config()["graph"]["server_url"] + graph_query,
json={},
@@ -61,8 +61,8 @@
check_http_get_response(api_client, url, status_code=200)
-def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323"
@@ -114,8 +114,8 @@
}
-def test_graph_json_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_json_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "stats"
@@ -132,8 +132,8 @@
assert resp.content == json.dumps(_response_json).encode()
-def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock):
- _authenticate_graph_user(api_client, keycloak_mock)
+def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock):
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb"
@@ -167,7 +167,7 @@
@given(origin())
def test_graph_response_resolve_origins(
- archive_data, api_client, keycloak_mock, requests_mock, origin
+ archive_data, api_client, keycloak_oidc, requests_mock, origin
):
hasher = hashlib.sha1()
hasher.update(origin["url"].encode())
@@ -182,7 +182,7 @@
)
)
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
for graph_query, response_text, content_type in (
(
@@ -238,9 +238,9 @@
def test_graph_response_resolve_origins_nothing_to_do(
- api_client, keycloak_mock, requests_mock
+ api_client, keycloak_oidc, requests_mock
):
- _authenticate_graph_user(api_client, keycloak_mock)
+ _authenticate_graph_user(api_client, keycloak_oidc)
graph_query = "stats"
diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_api_auth.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import pytest
-
-from django.contrib.auth.models import AnonymousUser, User
-
-from swh.auth.django.models import OIDCUser
-from swh.web.common.utils import reverse
-from swh.web.tests.utils import check_api_get_responses, check_http_get_response
-
-
-@pytest.mark.django_db
-def test_drf_django_session_auth_success(keycloak_mock, client):
- """
- Check user gets authenticated when querying the web api
- through a web browser.
- """
- url = reverse("api-1-stat-counters")
-
- client.login(code="", code_verifier="", redirect_uri="")
-
- response = check_http_get_response(client, url, status_code=200)
- request = response.wsgi_request
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remoter used has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client):
- """
- Check user gets authenticated when querying the web api
- through an HTTP client using bearer token authentication.
- """
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=200)
- request = response.wsgi_request
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remoter used has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client):
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # check for failed authentication but with expected token format
- keycloak_mock.set_auth_success(False)
- api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- # check for failed authentication when token format is invalid
- api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà")
-
- response = check_api_get_responses(api_client, url, status_code=400)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
-
-def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client):
- url = reverse("api-1-stat-counters")
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # missing authorization type
- api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- # invalid authorization type
- api_client.credentials(HTTP_AUTHORIZATION="Foo token")
-
- response = check_api_get_responses(api_client, url, status_code=403)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_backends.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from datetime import datetime, timedelta
-from unittest.mock import Mock
-
-import pytest
-
-from django.conf import settings
-from django.contrib.auth import authenticate, get_backends
-from rest_framework.exceptions import AuthenticationFailed
-
-from swh.auth.django.models import OIDCUser
-from swh.web.auth.backends import OIDCBearerTokenAuthentication
-from swh.web.common.utils import reverse
-
-
-def _authenticate_user(request_factory):
- request = request_factory.get(reverse("oidc-login-complete"))
-
- return authenticate(
- request=request,
- code="some-code",
- code_verifier="some-code-verifier",
- redirect_uri="https://localhost:5004",
- )
-
-
-def _check_authenticated_user(user, decoded_token, kc_oidc_mock):
- assert user is not None
- assert isinstance(user, OIDCUser)
- assert user.id != 0
- assert user.username == decoded_token["preferred_username"]
- assert user.password == ""
- assert user.first_name == decoded_token["given_name"]
- assert user.last_name == decoded_token["family_name"]
- assert user.email == decoded_token["email"]
- assert user.is_staff == ("/staff" in decoded_token["groups"])
- assert user.sub == decoded_token["sub"]
- resource_access = decoded_token.get("resource_access", {})
- resource_access_client = resource_access.get(kc_oidc_mock, {})
- assert user.permissions == set(resource_access_client.get("roles", []))
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory):
- """
- Checks successful login based on OpenID Connect with PKCE extension
- Django authentication backend (login from Web UI).
- """
- keycloak_mock.user_groups = ["/staff"]
-
- oidc_profile = keycloak_mock.login()
- user = _authenticate_user(request_factory)
-
- decoded_token = keycloak_mock.decode_token(user.access_token)
- _check_authenticated_user(user, decoded_token, keycloak_mock)
-
- auth_datetime = datetime.fromtimestamp(decoded_token["iat"])
- exp_datetime = datetime.fromtimestamp(decoded_token["exp"])
- refresh_exp_datetime = auth_datetime + timedelta(
- seconds=oidc_profile["refresh_expires_in"]
- )
-
- assert user.access_token == oidc_profile["access_token"]
- assert user.expires_at == exp_datetime
- assert user.id_token == oidc_profile["id_token"]
- assert user.refresh_token == oidc_profile["refresh_token"]
- assert user.refresh_expires_at == refresh_exp_datetime
- assert user.scope == oidc_profile["scope"]
- assert user.session_state == oidc_profile["session_state"]
-
- backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend"
- assert user.backend == backend_path
- backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
- assert get_backends()[backend_idx].get_user(user.id) == user
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory):
- """
- Checks failed login based on OpenID Connect with PKCE extension Django
- authentication backend (login from Web UI).
- """
- keycloak_mock.set_auth_success(False)
-
- user = _authenticate_user(request_factory)
-
- assert user is None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_success(
- keycloak_mock, request_factory
-):
- """
- Checks access token renewal success using refresh token.
- """
-
- oidc_profile = keycloak_mock.login()
- decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"])
- new_access_token = "new_access_token"
-
- def _refresh_token(refresh_token):
- oidc_profile = dict(keycloak_mock.login())
- oidc_profile["access_token"] = new_access_token
- return oidc_profile
-
- def _decode_token(access_token):
- if access_token != new_access_token:
- raise Exception("access token token has expired")
- else:
- return decoded_token
-
- keycloak_mock.decode_token = Mock()
- keycloak_mock.decode_token.side_effect = _decode_token
- keycloak_mock.refresh_token.side_effect = _refresh_token
-
- user = _authenticate_user(request_factory)
-
- oidc_profile = keycloak_mock.login()
- keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
-
- assert user is not None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_refresh_token_failure(
- keycloak_mock, request_factory
-):
- """
- Checks access token renewal failure using refresh token.
- """
-
- def _refresh_token(refresh_token):
- raise Exception("OIDC session has expired")
-
- def _decode_token(access_token):
- raise Exception("access token token has expired")
-
- keycloak_mock.decode_token = Mock()
- keycloak_mock.decode_token.side_effect = _decode_token
- keycloak_mock.refresh_token.side_effect = _refresh_token
-
- user = _authenticate_user(request_factory)
-
- oidc_profile = keycloak_mock.login()
- keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"])
-
- assert user is None
-
-
-@pytest.mark.django_db
-def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory):
- """
- Checks that a permission defined with OpenID Connect is correctly mapped
- to a Django one when logging from Web UI.
- """
- permission = "webapp.some-permission"
- keycloak_mock.user_permissions = [permission]
- user = _authenticate_user(request_factory)
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
- assert user.has_module_perms("webapp")
- assert not user.has_module_perms("foo")
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory):
- """
- Checks successful login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login).
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
- access_token = oidc_profile["access_token"]
-
- decoded_token = keycloak_mock.decode_token(access_token)
-
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user, decoded_token, keycloak_mock)
- # oidc_profile is not filled when authenticating through bearer token
- assert hasattr(user, "access_token") and user.access_token is None
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory):
- """
- Checks failed login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login).
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
-
- # simulate a failed authentication with a bearer token in expected format
- keycloak_mock.set_auth_success(False)
-
- refresh_token = oidc_profile["refresh_token"]
-
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
- # simulate a failed authentication with an invalid bearer token format
- request = api_request_factory.get(
- url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
- )
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
-
-def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory):
- """
- Checks failed login based on OpenID Connect bearer token Django REST
- Framework authentication backend (Web API login) due to invalid
- authorization header value.
- """
- url = reverse("api-1-stat-counters")
- drf_auth_backend = OIDCBearerTokenAuthentication()
-
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
-
- # Invalid authorization type
- request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
- # Missing authorization type
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}")
-
- with pytest.raises(AuthenticationFailed):
- drf_auth_backend.authenticate(request)
-
-
-@pytest.mark.django_db
-def test_drf_oidc_bearer_token_auth_backend_permissions(
- keycloak_mock, api_request_factory
-):
- """
- Checks that a permission defined with OpenID Connect is correctly mapped
- to a Django one when using bearer token authentication.
- """
- permission = "webapp.some-permission"
- keycloak_mock.user_permissions = [permission]
-
- drf_auth_backend = OIDCBearerTokenAuthentication()
- oidc_profile = keycloak_mock.login()
- refresh_token = oidc_profile["refresh_token"]
- url = reverse("api-1-stat-counters")
- request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
- user, _ = drf_auth_backend.authenticate(request)
-
- assert user.has_perm(permission)
- assert user.get_all_permissions() == {permission}
- assert user.get_group_permissions() == {permission}
- assert user.has_module_perms("webapp")
- assert not user.has_module_perms("foo")
diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py
deleted file mode 100644
--- a/swh/web/tests/auth/test_middlewares.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Copyright (C) 2020 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU Affero General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-import pytest
-
-from django.core.cache import cache
-from django.test import modify_settings
-
-from swh.web.common.utils import reverse
-from swh.web.tests.utils import check_html_get_response
-
-
-@pytest.mark.django_db
-@modify_settings(
- MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]}
-)
-def test_oidc_session_expired_middleware_disabled(client, keycloak_mock):
- # authenticate user
-
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- url = reverse("swh-web-homepage")
-
- # visit url first to get user from response
- response = check_html_get_response(client, url, status_code=200)
-
- # simulate OIDC session expiration
- cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
-
- # no redirection when session has expired
- check_html_get_response(client, url, status_code=200)
-
-
-@pytest.mark.django_db
-def test_oidc_session_expired_middleware_enabled(client, keycloak_mock):
- # authenticate user
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- url = reverse("swh-web-homepage")
-
- # visit url first to get user from response
- response = check_html_get_response(client, url, status_code=200)
-
- # simulate OIDC session expiration
- cache.delete(f"oidc_user_{response.wsgi_request.user.id}")
-
- # should redirect to logout page
- resp = check_html_get_response(client, url, status_code=302)
- silent_refresh_url = reverse(
- "logout", query_params={"next_path": url, "remote_user": 1}
- )
- assert resp["location"] == silent_refresh_url
diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py
--- a/swh/web/tests/auth/test_utils.py
+++ b/swh/web/tests/auth/test_utils.py
@@ -3,40 +3,11 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from base64 import urlsafe_b64encode
-import hashlib
-import re
from cryptography.fernet import InvalidToken
import pytest
-from swh.web.auth.utils import decrypt_data, encrypt_data, gen_oidc_pkce_codes
-
-
-def test_gen_oidc_pkce_codes():
- """
- Check generated PKCE codes respect the specification
- (see https://tools.ietf.org/html/rfc7636#section-4.1)
- """
- code_verifier, code_challenge = gen_oidc_pkce_codes()
-
- # check the code verifier only contains allowed characters
- assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier)
-
- # check minimum and maximum authorized length for the
- # code verifier
- assert len(code_verifier) >= 43
- assert len(code_verifier) <= 128
-
- # compute code challenge from code verifier
- challenge = hashlib.sha256(code_verifier.encode("ascii")).digest()
- challenge = urlsafe_b64encode(challenge).decode("ascii")
- challenge = challenge.replace("=", "")
-
- # check base64 padding is not present
- assert not code_challenge[-1].endswith("=")
- # check code challenge is valid
- assert code_challenge == challenge
+from swh.web.auth.utils import decrypt_data, encrypt_data
def test_encrypt_decrypt_data_ok():
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -9,10 +9,8 @@
import pytest
-from django.contrib.auth.models import AnonymousUser, User
from django.http import QueryDict
-from swh.auth.django.models import OIDCUser
from swh.web.auth.models import OIDCUserOfflineTokens
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data
from swh.web.common.utils import reverse
@@ -27,11 +25,11 @@
def _check_oidc_login_code_flow_data(
- request, response, kc_oidc_mock, redirect_uri, scope="openid"
+ request, response, keycloak_oidc, redirect_uri, scope="openid"
):
parsed_url = urlparse(response["location"])
- authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
+ authorization_url = keycloak_oidc.well_known()["authorization_endpoint"]
query_dict = QueryDict(parsed_url.query)
# check redirect url is valid
@@ -59,233 +57,6 @@
return login_data
-@pytest.mark.django_db
-def test_oidc_login_views_success(client, keycloak_mock):
- """
- Simulate a successful login authentication with OpenID Connect
- authorization code flow with PKCE.
- """
- # user initiates login process
- login_url = reverse("oidc-login")
-
- # should redirect to Keycloak authentication page in order
- # for a user to login with its username / password
- response = check_html_get_response(client, login_url, status_code=302)
- request = response.wsgi_request
-
- assert isinstance(request.user, AnonymousUser)
-
- login_data = _check_oidc_login_code_flow_data(
- request,
- response,
- keycloak_mock,
- redirect_uri=reverse("oidc-login-complete", request=request),
- )
-
- # once a user has identified himself in Keycloak, he is
- # redirected to the 'oidc-login-complete' view to
- # login in Django.
-
- # generate authorization code / session state in the same
- # manner as Keycloak
- code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
- session_state = str(uuid.uuid4())
-
- login_complete_url = reverse(
- "oidc-login-complete",
- query_params={
- "code": code,
- "state": login_data["state"],
- "session_state": session_state,
- },
- )
-
- # login process finalization, should redirect to root url by default
- response = check_html_get_response(client, login_complete_url, status_code=302)
- request = response.wsgi_request
-
- assert response["location"] == request.build_absolute_uri("/")
-
- # user should be authenticated
- assert isinstance(request.user, OIDCUser)
-
- # check remote user has not been saved to Django database
- with pytest.raises(User.DoesNotExist):
- User.objects.get(username=request.user.username)
-
-
-@pytest.mark.django_db
-def test_oidc_logout_view_success(client, keycloak_mock):
- """
- Simulate a successful logout operation with OpenID Connect.
- """
- # login our test user
- client.login(code="", code_verifier="", redirect_uri="")
- keycloak_mock.authorization_code.assert_called()
-
- # user initiates logout
- oidc_logout_url = reverse("oidc-logout")
-
- # should redirect to logout page
- response = check_html_get_response(client, oidc_logout_url, status_code=302)
- request = response.wsgi_request
-
- logout_url = reverse("logout", query_params={"remote_user": 1})
- assert response["location"] == request.build_absolute_uri(logout_url)
-
- # should have been logged out in Keycloak
- oidc_profile = keycloak_mock.login()
- keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"])
-
- # check effective logout in Django
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_login_view_failure(client, keycloak_mock):
- """
- Simulate a failed authentication with OpenID Connect.
- """
- keycloak_mock.set_auth_success(False)
-
- # user initiates login process
- login_url = reverse("oidc-login")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
-
- # no users should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-# Simulate possible errors with OpenID Connect in the login complete view.
-
-
-def test_oidc_login_complete_view_no_login_data(client, mocker):
- # user initiates login process
- login_url = reverse("oidc-login-complete")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
-
- assert_contains(
- response, "Login process has not been initialized.", status_code=500
- )
-
-
-def test_oidc_login_complete_view_missing_parameters(client, mocker):
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # user initiates login process
- login_url = reverse("oidc-login-complete")
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=400, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(
- response, "Missing query parameters for authentication.", status_code=400
- )
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock):
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # user initiates login process
- login_url = reverse(
- "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"}
- )
-
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=400, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(
- response, "Wrong CSRF token, aborting login process.", status_code=400
- )
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock):
- keycloak_mock.set_auth_success(False)
-
- # simulate login process has been initialized
- session = client.session
- session["login_data"] = {
- "code_verifier": "",
- "state": str(uuid.uuid4()),
- "redirect_uri": "",
- "next_path": "",
- }
- session.save()
-
- # check authentication error is reported
- login_url = reverse(
- "oidc-login-complete",
- query_params={"code": "some-code", "state": session["login_data"]["state"]},
- )
-
- # should render an error page
- response = check_html_get_response(
- client, login_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(response, "User authentication failed.", status_code=500)
-
- # no user should be logged in
- assert isinstance(request.user, AnonymousUser)
-
-
-@pytest.mark.django_db
-def test_oidc_logout_view_failure(client, keycloak_mock):
- """
- Simulate a failed logout operation with OpenID Connect.
- """
- # login our test user
- client.login(code="", code_verifier="", redirect_uri="")
-
- err_msg = "Authentication server error"
- keycloak_mock.logout.side_effect = Exception(err_msg)
-
- # user initiates logout process
- logout_url = reverse("oidc-logout")
- # should render an error page
- response = check_html_get_response(
- client, logout_url, status_code=500, template_used="error.html"
- )
- request = response.wsgi_request
- assert_contains(response, err_msg, status_code=500)
-
- # user should be logged out from Django anyway
- assert isinstance(request.user, AnonymousUser)
-
-
def test_view_rendering_when_user_not_set_in_request(request_factory):
request = request_factory.get("/")
# Django RequestFactory do not set any user by default
@@ -341,7 +112,7 @@
)
nb_tokens = len(OIDCUserOfflineTokens.objects.all())
- response = check_html_get_response(client, token_complete_url, status_code=302)
+ response = check_http_get_response(client, token_complete_url, status_code=302)
request = response.wsgi_request
# check token has been generated and saved encrypted to database
@@ -360,12 +131,12 @@
@pytest.mark.django_db
-def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock):
+def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc):
"""
Authenticated user should be able to generate a bearer token using OIDC
Authorization Code Flow.
"""
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
def test_oidc_list_bearer_tokens_anonymous_user(client):
@@ -379,14 +150,14 @@
@pytest.mark.django_db
-def test_oidc_list_bearer_tokens(client, keycloak_mock):
+def test_oidc_list_bearer_tokens(client, keycloak_oidc):
"""
User with correct credentials should be allowed to list his tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse(
"oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10}
@@ -411,14 +182,14 @@
@pytest.mark.django_db
-def test_oidc_get_bearer_token(client, keycloak_mock):
+def test_oidc_get_bearer_token(client, keycloak_oidc):
"""
User with correct credentials should be allowed to display a token.
"""
nb_tokens = 3
for i in range(nb_tokens):
- token = _generate_and_test_bearer_token(client, keycloak_mock)
+ token = _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse("oidc-get-bearer-token")
@@ -441,14 +212,14 @@
@pytest.mark.django_db
-def test_oidc_revoke_bearer_tokens(client, keycloak_mock):
+def test_oidc_revoke_bearer_tokens(client, keycloak_oidc):
"""
User with correct credentials should be allowed to revoke tokens.
"""
nb_tokens = 3
for _ in range(nb_tokens):
- _generate_and_test_bearer_token(client, keycloak_mock)
+ _generate_and_test_bearer_token(client, keycloak_oidc)
url = reverse("oidc-revoke-bearer-tokens")
@@ -470,12 +241,12 @@
"""
url = reverse("oidc-profile")
login_url = reverse("oidc-login", query_params={"next_path": url})
- resp = check_html_get_response(client, url, status_code=302)
+ resp = check_http_get_response(client, url, status_code=302)
assert resp["location"] == login_url
@pytest.mark.django_db
-def test_oidc_profile_view(client, keycloak_mock):
+def test_oidc_profile_view(client, keycloak_oidc):
"""
Authenticated users should be able to request the profile page
and link to Keycloak account UI should be present.
@@ -483,7 +254,7 @@
url = reverse("oidc-profile")
kc_config = get_config()["keycloak"]
user_permissions = ["perm1", "perm2"]
- keycloak_mock.user_permissions = user_permissions
+ keycloak_oidc.user_permissions = user_permissions
client.login(code="", code_verifier="", redirect_uri="")
resp = check_html_get_response(
client, url, status_code=200, template_used="auth/profile.html"
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -18,7 +18,6 @@
from django.core.cache import cache
from rest_framework.test import APIClient, APIRequestFactory
-from swh.auth.pytest_plugin import keycloak_mock_factory
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
@@ -28,6 +27,8 @@
from swh.web.config import get_config
from swh.web.tests.data import get_tests_data, override_storages
+pytest_plugins = ["swh.auth.pytest_plugin"]
+
# Used to skip some tests
ctags_json_missing = (
shutil.which("ctags") is None
@@ -361,22 +362,15 @@
yield converters.from_swh(ctag, hashess={"id"})
-_keycloak_config = get_config()["keycloak"]
-
-_keycloak_mock = keycloak_mock_factory(
- server_url=_keycloak_config["server_url"],
- realm_name=_keycloak_config["realm_name"],
- client_id=OIDC_SWH_WEB_CLIENT_ID,
-)
+@pytest.fixture
+def keycloak_oidc(keycloak_oidc, mocker):
+ keycloak_config = get_config()["keycloak"]
+ keycloak_oidc.server_url = keycloak_config["server_url"]
+ keycloak_oidc.realm_name = keycloak_config["realm_name"]
+ keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID
-@pytest.fixture
-def keycloak_mock(_keycloak_mock, mocker):
- for oidc_client_factory in (
- "swh.web.auth.views.get_oidc_client",
- "swh.web.auth.backends.get_oidc_client",
- ):
- mock_get_oidc_client = mocker.patch(oidc_client_factory)
- mock_get_oidc_client.return_value = _keycloak_mock
+ keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client")
+ keycloak_oidc_client.return_value = keycloak_oidc
- return _keycloak_mock
+ return keycloak_oidc