diff --git a/requirements-swh.txt b/requirements-swh.txt index b95db0f5..3632177f 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,8 +1,8 @@ -swh.auth[django] >= 0.3.7 +swh.auth[django] >= 0.5.0 swh.core >= 0.0.95 swh.indexer >= 0.4.1 swh.model >= 0.5.0 swh.scheduler >= 0.7.0 swh.search >= 0.2.0 swh.storage >= 0.11.10 swh.vault >= 0.0.33 diff --git a/requirements.txt b/requirements.txt index c0924dde..887e4561 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,24 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html beautifulsoup4 cryptography django < 3 django-cors-headers django-js-reverse djangorestframework django-webpack-loader docutils htmlmin iso8601 lxml prometheus-client pybadges pygments -python-keycloak >= 0.19.0 python-magic >= 0.4.0 python-memcached pyyaml requests sentry-sdk typing-extensions diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py deleted file mode 100644 index 1ad9523d..00000000 --- a/swh/web/auth/backends.py +++ /dev/null @@ -1,190 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from datetime import datetime, timedelta -import hashlib -from typing import Any, Dict, Optional - -import sentry_sdk - -from django.core.cache import cache -from django.http import HttpRequest -from django.utils import timezone -from rest_framework.authentication import BaseAuthentication -from rest_framework.exceptions import AuthenticationFailed, ValidationError - -from swh.auth.django.models import OIDCUser -from swh.web.auth.utils import get_oidc_client - - -def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: - # compute an integer user identifier for Django User model - # by concatenating all groups of the UUID4 user identifier - # generated by Keycloak and converting it from hex to decimal - user_id = int("".join(decoded_token["sub"].split("-")), 16) - - # create a Django user that will not be saved to database - user = OIDCUser( - id=user_id, - username=decoded_token["preferred_username"], - password="", - first_name=decoded_token["given_name"], - last_name=decoded_token["family_name"], - email=decoded_token["email"], - ) - - # set is_staff user property based on groups - if "groups" in decoded_token: - user.is_staff = "/staff" in decoded_token["groups"] - - # extract user permissions if any - resource_access = decoded_token.get("resource_access", {}) - client_resource_access = resource_access.get(get_oidc_client().client_id, {}) - user.permissions = set(client_resource_access.get("roles", [])) - - # add user sub to custom User proxy model - user.sub = decoded_token["sub"] - - return user - - -def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: - - oidc_client = get_oidc_client() - - # decode JWT token - try: - access_token = oidc_profile["access_token"] - decoded_token = oidc_client.decode_token(access_token) - # access token has expired or is invalid - except Exception: - # get a new access token from authentication provider - oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) - # decode access token - decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) - - # create OIDCUser from decoded token - user = _oidc_user_from_decoded_token(decoded_token) - - # get authentication init datetime - auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) - exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) - - # compute OIDC tokens expiration date - oidc_profile["expires_at"] = exp_datetime - oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( - seconds=oidc_profile["refresh_expires_in"] - ) - - # add OIDC profile data to custom User proxy model - for key, val in oidc_profile.items(): - if hasattr(user, key): - setattr(user, key, val) - - # put OIDC profile in cache or update it after token renewal - cache_key = f"oidc_user_{user.id}" - if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: - # set cache key TTL as refresh token expiration time - assert user.refresh_expires_at - ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) - - # save oidc_profile in cache - cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) - - return user - - -class OIDCAuthorizationCodePKCEBackend: - def authenticate( - self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str - ) -> Optional[OIDCUser]: - - user = None - try: - # try to authenticate user with OIDC PKCE authorization code flow - oidc_profile = get_oidc_client().authorization_code( - code, redirect_uri, code_verifier=code_verifier - ) - - # create Django user - user = _oidc_user_from_profile(oidc_profile) - - except Exception as e: - sentry_sdk.capture_exception(e) - - return user - - def get_user(self, user_id: int) -> Optional[OIDCUser]: - # get oidc profile from cache - oidc_profile = cache.get(f"oidc_user_{user_id}") - if oidc_profile: - try: - user = _oidc_user_from_profile(oidc_profile) - # restore auth backend - setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") - return user - except Exception as e: - sentry_sdk.capture_exception(e) - return None - else: - return None - - -class OIDCBearerTokenAuthentication(BaseAuthentication): - def authenticate(self, request): - auth_header = request.META.get("HTTP_AUTHORIZATION") - if auth_header is None: - return None - - try: - auth_type, refresh_token = auth_header.split(" ", 1) - except ValueError: - raise AuthenticationFailed("Invalid HTTP authorization header format") - - if auth_type != "Bearer": - raise AuthenticationFailed( - (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") - ) - try: - - oidc_client = get_oidc_client() - - # compute a cache key from the token that does not exceed - # memcached key size limit - hasher = hashlib.sha1() - hasher.update(refresh_token.encode("ascii")) - cache_key = f"api_token_{hasher.hexdigest()}" - - # check if an access token is cached - access_token = cache.get(cache_key) - - # attempt to decode access token - try: - decoded_token = oidc_client.decode_token(access_token) - except Exception: - # access token is None or it has expired - decoded_token = None - - if access_token is None or decoded_token is None: - # get a new access token from authentication provider - access_token = oidc_client.refresh_token(refresh_token)["access_token"] - # decode access token - decoded_token = oidc_client.decode_token(access_token) - # compute access token expiration - exp = datetime.fromtimestamp(decoded_token["exp"]) - ttl = int(exp.timestamp() - timezone.now().timestamp()) - # save access token in cache while it is valid - cache.set(cache_key, access_token, timeout=max(0, ttl)) - - # create Django user - user = _oidc_user_from_decoded_token(decoded_token) - except UnicodeEncodeError as e: - sentry_sdk.capture_exception(e) - raise ValidationError("Invalid bearer token") - except Exception as e: - sentry_sdk.capture_exception(e) - raise AuthenticationFailed(str(e)) - - return user, None diff --git a/swh/web/auth/middlewares.py b/swh/web/auth/middlewares.py deleted file mode 100644 index 2d8e166e..00000000 --- a/swh/web/auth/middlewares.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from django.contrib.auth import BACKEND_SESSION_KEY -from django.http.response import HttpResponseRedirect - -from swh.web.common.utils import reverse - - -class OIDCSessionExpiredMiddleware: - """ - Middleware for checking OIDC user session expiration. - """ - - def __init__(self, get_response=None): - self.get_response = get_response - self.exempted_urls = [ - reverse(v) - for v in ("logout", "oidc-login", "oidc-login-complete", "oidc-logout") - ] - - def __call__(self, request): - if ( - request.method != "GET" - or request.user.is_authenticated - or BACKEND_SESSION_KEY not in request.session - or "OIDC" not in request.session[BACKEND_SESSION_KEY] - or request.path in self.exempted_urls - ): - return self.get_response(request) - - # At that point, we know that a OIDC user was previously logged in - # and his session has expired. - # User will be redirected to logout page and a link will be offered to - # login again. - next_path = request.get_full_path() - logout_url = reverse( - "logout", query_params={"next_path": next_path, "remote_user": 1} - ) - return HttpResponseRedirect(logout_url) diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py index 87f12fb1..fe2303a0 100644 --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -1,128 +1,70 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import urlsafe_b64encode -import hashlib -import secrets -from typing import Dict, Tuple from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from swh.auth.keycloak import KeycloakOpenIDConnect -from swh.web.config import get_config - - -def gen_oidc_pkce_codes() -> Tuple[str, str]: - """ - Generates a code verifier and a code challenge to be used - with the OpenID Connect authorization code flow with PKCE - ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636). - - PKCE replaces the static secret used in the standard authorization - code flow with a temporary one-time challenge, making it feasible - to use in public clients. - - The implementation is inspired from that blog post: - https://www.stefaanlippens.net/oauth-code-flow-pkce.html - """ - # generate a code verifier which is a long enough random alphanumeric - # string, only to be used "client side" - code_verifier_str = secrets.token_urlsafe(60) - - # create the PKCE code challenge by hashing the code verifier with SHA256 - # and encoding the result in URL-safe base64 (without padding) - code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest() - code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii") - code_challenge_str = code_challenge_str.replace("=", "") - - return code_verifier_str, code_challenge_str - - OIDC_SWH_WEB_CLIENT_ID = "swh-web" def _get_fernet(password: bytes, salt: bytes) -> Fernet: """ Instantiate a Fernet system from a password and a salt value (see https://cryptography.io/en/latest/fernet/). Args: password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The Fernet system """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend(), ) key = urlsafe_b64encode(kdf.derive(password)) return Fernet(key) def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Encrypt data using Fernet system (symmetric encryption). Args: data: input data to encrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The encrypted data """ return _get_fernet(password, salt).encrypt(data) def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Decrypt data using Fernet system (symmetric encryption). Args: data: input data to decrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The decrypted data """ return _get_fernet(password, salt).decrypt(data) - - -# stores instances of KeycloakOpenIDConnect class -# dict keys are (realm_name, client_id) tuples -_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {} - - -def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect: - """ - Instantiate a KeycloakOpenIDConnect class for a given client in the - SoftwareHeritage realm. - - Args: - client_id: client identifier in the SoftwareHeritage realm - - Returns: - An object to ease the interaction with the Keycloak server - """ - keycloak_config = get_config()["keycloak"] - - if client_id not in _keycloak_oidc: - _keycloak_oidc[client_id] = KeycloakOpenIDConnect( - keycloak_config["server_url"], keycloak_config["realm_name"], client_id - ) - return _keycloak_oidc[client_id] diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index f1b56647..4e1b937d 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,273 +1,161 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast -import uuid from cryptography.fernet import InvalidToken from django.conf.urls import url -from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required -from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser +from swh.auth.django.utils import keycloak_oidc_client +from swh.auth.django.views import get_oidc_login_data, oidc_login_view +from swh.auth.django.views import urlpatterns as auth_urlpatterns from swh.web.auth.models import OIDCUserOfflineTokens -from swh.web.auth.utils import ( - decrypt_data, - encrypt_data, - gen_oidc_pkce_codes, - get_oidc_client, -) -from swh.web.common.exc import BadInputExc, ForbiddenExc +from swh.web.auth.utils import decrypt_data, encrypt_data +from swh.web.common.exc import ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config -def _oidc_login(request: HttpRequest, redirect_uri: str, scope: str = "openid"): - # generate a CSRF token - state = str(uuid.uuid4()) - - code_verifier, code_challenge = gen_oidc_pkce_codes() - - request.session["login_data"] = { - "code_verifier": code_verifier, - "state": state, - "redirect_uri": redirect_uri, - "next_path": request.GET.get("next_path", ""), - } - - authorization_url_params = { - "state": state, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - "scope": scope, - } - - oidc_client = get_oidc_client() - authorization_url = oidc_client.authorization_url( - redirect_uri, **authorization_url_params - ) - - return HttpResponseRedirect(authorization_url) - - -def oidc_login(request: HttpRequest) -> HttpResponse: - """ - Django view to initiate login process using OpenID Connect. - """ - - redirect_uri = reverse("oidc-login-complete", request=request) - - return _oidc_login(request, redirect_uri=redirect_uri) - - -def _get_login_data(request: HttpRequest) -> Dict[str, Any]: - if "login_data" not in request.session: - raise Exception("Login process has not been initialized.") - - return request.session["login_data"] - - -def _check_login_data(request: HttpRequest, login_data: Dict[str, Any]): - - if "code" not in request.GET or "state" not in request.GET: - raise BadInputExc("Missing query parameters for authentication.") - - # get CSRF token returned by OIDC server - state = request.GET["state"] - - if state != login_data["state"]: - raise BadInputExc("Wrong CSRF token, aborting login process.") - - -def oidc_login_complete(request: HttpRequest) -> HttpResponse: - """ - Django view to finalize login process using OpenID Connect. - """ - login_data = _get_login_data(request) - next_path = login_data["next_path"] or request.build_absolute_uri("/") - - if "error" in request.GET: - raise Exception(request.GET["error"]) - - _check_login_data(request, login_data) - - user = authenticate( - request=request, - code=request.GET["code"], - code_verifier=login_data["code_verifier"], - redirect_uri=login_data["redirect_uri"], - ) - - if user is None: - raise Exception("User authentication failed.") - - login(request, user) - - return HttpResponseRedirect(next_path) - - -def oidc_logout(request: HttpRequest) -> HttpResponse: - """ - Django view to logout using OpenID Connect. - """ - user = request.user - logout(request) - if hasattr(user, "refresh_token"): - oidc_client = get_oidc_client() - user = cast(OIDCUser, user) - refresh_token = cast(str, user.refresh_token) - # end OpenID Connect session - oidc_client.logout(refresh_token) - # remove user data from cache - cache.delete(f"oidc_user_{user.id}") - - logout_url = reverse("logout", query_params={"remote_user": 1}) - return HttpResponseRedirect(request.build_absolute_uri(logout_url)) - - def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) - return _oidc_login( + return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) - oidc_client = get_oidc_client() - login_data = _get_login_data(request) - _check_login_data(request, login_data) + login_data = get_oidc_login_data(request) + oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, secret, salt) - oidc_client = get_oidc_client() + oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") -urlpatterns = [ - url(r"^oidc/login/$", oidc_login, name="oidc-login"), - url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), - url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), +urlpatterns = auth_urlpatterns + [ url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/generate-bearer-token-complete/$", oidc_generate_bearer_token_complete, name="oidc-generate-bearer-token-complete", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index cd9f5040..4ed4e495 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,282 +1,288 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config["secret_key"] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config["debug"] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config["debug"] ALLOWED_HOSTS = ["127.0.0.1", "localhost"] + swh_web_config["allowed_hosts"] # Application definition INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "swh.web.common", "swh.web.api", "swh.web.auth", "swh.web.browse", "webpack_loader", "django_js_reverse", "corsheaders", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "corsheaders.middleware.CorsMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", - "swh.web.auth.middlewares.OIDCSessionExpiredMiddleware", + "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", "swh.web.common.middlewares.ExceptionMiddleware", ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config["serve_assets"]: MIDDLEWARE.insert(0, "django.middleware.gzip.GZipMiddleware") ROOT_URLCONF = "swh.web.urls" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [os.path.join(PROJECT_DIR, "../templates")], "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", "swh.web.common.utils.context_processor", ], "libraries": {"swh_templatetags": "swh.web.common.swh_templatetags",}, }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config["development_db"], } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", # noqa }, {"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",}, {"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",}, {"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",}, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = "en-us" TIME_ZONE = "UTC" USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = "/static/" # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, "../../../static") STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ["127.0.0.1"] throttle_rates = {} http_requests = ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"] throttling = swh_web_config["throttling"] for limiter_scope, limiter_conf in throttling["scopes"].items(): if "default" in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"]["default"] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope + "_" + http_request.lower()] = limiter_conf[ "limiter_rate" ][http_request] REST_FRAMEWORK: Dict[str, Any] = { "DEFAULT_RENDERER_CLASSES": ( "rest_framework.renderers.JSONRenderer", "swh.web.api.renderers.YAMLRenderer", "rest_framework.renderers.TemplateHTMLRenderer", ), "DEFAULT_THROTTLE_CLASSES": ("swh.web.api.throttling.SwhWebRateThrottle",), "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", - "swh.web.auth.backends.OIDCBearerTokenAuthentication", + "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } LOGGING = { "version": 1, "disable_existing_loggers": False, "filters": { "require_debug_false": {"()": "django.utils.log.RequireDebugFalse",}, "require_debug_true": {"()": "django.utils.log.RequireDebugTrue",}, }, "formatters": { "request": { "format": "[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "simple": { "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "verbose": { "format": ( "[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s " "- %(message)s" ), "datefmt": "%d/%b/%Y %H:%M:%S", }, }, "handlers": { "console": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "simple", }, "file": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "simple", }, "file_request": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "request", }, "console_verbose": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "verbose", }, "file_verbose": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "verbose", }, "null": {"class": "logging.NullHandler",}, }, "loggers": { "": { "handlers": ["console_verbose", "file_verbose"], "level": "DEBUG" if DEBUG else "WARNING", }, "django": { "handlers": ["console"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.request": { "handlers": ["file_request"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.db.backends": {"handlers": ["null"], "propagate": False}, "django.utils.autoreload": {"level": "INFO",}, }, } WEBPACK_LOADER = { "DEFAULT": { "CACHE": False, "BUNDLE_DIR_NAME": "./", "STATS_FILE": os.path.join(STATIC_DIR, "webpack-stats.json"), "POLL_INTERVAL": 0.1, "TIMEOUT": None, "IGNORE": [".+\\.hot-update.js", ".+\\.map"], } } LOGIN_URL = "/admin/login/" LOGIN_REDIRECT_URL = "admin" SESSION_ENGINE = "django.contrib.sessions.backends.cache" CACHES = { "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r"^/(badge|api)/.*$" AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", - "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend", + "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] + +SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"] +SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"] +SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID +SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html index 47676ddc..6673686b 100644 --- a/swh/web/templates/layout.html +++ b/swh/web/templates/layout.html @@ -1,275 +1,275 @@ {% comment %} Copyright (C) 2015-2020 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load js_reverse %} {% load static %} {% load render_bundle from webpack_loader %} {% load swh_templatetags %} {% block title %}{% endblock %} {% render_bundle 'vendors' %} {% render_bundle 'webapp' %} {% block header %}{% endblock %} {% if "production" in DJANGO_SETTINGS_MODULE %} {% endif %}
{% if swh_web_staging %}
Staging
{% endif %} {% block content %}{% endblock %}
{% include "includes/global-modals.html" %}
back to top
diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py index 7b4a65f4..d4d3b52a 100644 --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -1,261 +1,261 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap from hypothesis import given from django.http.response import StreamingHttpResponse from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config from swh.web.tests.strategies import origin from swh.web.tests.utils import check_http_get_response def test_graph_endpoint_no_authentication_for_vpn_users(api_client, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response( api_client, url, status_code=200, server_name=SWH_WEB_INTERNAL_SERVER_NAME ) def test_graph_endpoint_needs_authentication(api_client): url = reverse("api-1-graph", url_args={"graph_query": "stats"}) check_http_get_response(api_client, url, status_code=401) -def _authenticate_graph_user(api_client, keycloak_mock): - keycloak_mock.user_permissions = [API_GRAPH_PERM] - oidc_profile = keycloak_mock.login() +def _authenticate_graph_user(api_client, keycloak_oidc): + keycloak_oidc.user_permissions = [API_GRAPH_PERM] + oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") -def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock): +def test_graph_endpoint_needs_permission(api_client, keycloak_oidc, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) - oidc_profile = keycloak_mock.login() + oidc_profile = keycloak_oidc.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") check_http_get_response(api_client, url, status_code=403) - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response(api_client, url, status_code=200) -def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_text_plain_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" response_text = textwrap.dedent( """\ swh:1:cnt:1d3dace0a825b0535c37c53ed669ef817e9c1b47 swh:1:cnt:6d5b280f4e33589ae967a7912a587dd5cb8dedaa swh:1:cnt:91bef238bf01356a550d416d14bb464c576ac6f4 swh:1:cnt:58a8b925a463b87d49639fda282b8f836546e396 swh:1:cnt:fd32ee0a87e16ccc853dfbeb7018674f9ce008c0 swh:1:cnt:ab7c39871872589a4fc9e249ebc927fb1042c90d swh:1:cnt:93073c02bf3869845977527de16af4d54765838d swh:1:cnt:4251f795b52c54c447a97c9fe904d8b1f993b1e0 swh:1:cnt:c6e7055424332006d07876ffeba684e7e284b383 swh:1:cnt:8459d8867dc3b15ef7ae9683e21cccc9ab2ec887 swh:1:cnt:5f9981d52202815aa947f85b9dfa191b66f51138 swh:1:cnt:00a685ec51bcdf398c15d588ecdedb611dbbab4b swh:1:cnt:e1cf1ea335106a0197a2f92f7804046425a7d3eb swh:1:cnt:07069b38087f88ec192d2c9aff75a502476fd17d swh:1:cnt:f045ee845c7f14d903a2c035b2691a7c400c01f0 """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": "text/plain", "Transfer-Encoding": "chunked"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response( api_client, url, status_code=200, content_type="text/plain" ) assert isinstance(resp, StreamingHttpResponse) assert b"".join(resp.streaming_content) == response_text.encode() _response_json = { "counts": {"nodes": 17075708289, "edges": 196236587976}, "ratios": { "compression": 0.16, "bits_per_node": 58.828, "bits_per_edge": 5.119, "avg_locality": 2184278529.729, }, "indegree": {"min": 0, "max": 263180117, "avg": 11.4921492364925}, "outdegree": {"min": 0, "max": 1033207, "avg": 11.4921492364925}, } -def test_graph_json_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_json_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() -def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock): - _authenticate_graph_user(api_client, keycloak_mock) +def test_graph_ndjson_response(api_client, keycloak_oidc, requests_mock): + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" response_ndjson = textwrap.dedent( """\ ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_ndjson, headers={ "Content-Type": "application/x-ndjson", "Transfer-Encoding": "chunked", }, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == "application/x-ndjson" assert b"".join(resp.streaming_content) == response_ndjson.encode() @given(origin()) def test_graph_response_resolve_origins( - archive_data, api_client, keycloak_mock, requests_mock, origin + archive_data, api_client, keycloak_oidc, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) origin_sha1 = hasher.digest() origin_swhid = str( ExtendedSWHID(object_type=ExtendedObjectType.ORIGIN, object_id=origin_sha1) ) snapshot = archive_data.snapshot_get_latest(origin["url"])["id"] snapshot_swhid = str( ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=hash_to_bytes(snapshot) ) ) - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) for graph_query, response_text, content_type in ( ( f"visit/nodes/{snapshot_swhid}", f"{snapshot_swhid}\n{origin_swhid}\n", "text/plain", ), ( f"visit/edges/{snapshot_swhid}", f"{snapshot_swhid} {origin_swhid}\n", "text/plain", ), ( f"visit/paths/{snapshot_swhid}", f'["{snapshot_swhid}", "{origin_swhid}"]\n', "application/x-ndjson", ), ): # set two lines response to check resolved origins cache response_text = response_text + response_text requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": content_type, "Transfer-Encoding": "chunked"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert b"".join(resp.streaming_content) == response_text.encode() url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward", "resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert ( b"".join(resp.streaming_content) == response_text.replace(origin_swhid, origin["url"]).encode() ) def test_graph_response_resolve_origins_nothing_to_do( - api_client, keycloak_mock, requests_mock + api_client, keycloak_oidc, requests_mock ): - _authenticate_graph_user(api_client, keycloak_mock) + _authenticate_graph_user(api_client, keycloak_oidc) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py deleted file mode 100644 index c71ec520..00000000 --- a/swh/web/tests/auth/test_api_auth.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from django.contrib.auth.models import AnonymousUser, User - -from swh.auth.django.models import OIDCUser -from swh.web.common.utils import reverse -from swh.web.tests.utils import check_api_get_responses, check_http_get_response - - -@pytest.mark.django_db -def test_drf_django_session_auth_success(keycloak_mock, client): - """ - Check user gets authenticated when querying the web api - through a web browser. - """ - url = reverse("api-1-stat-counters") - - client.login(code="", code_verifier="", redirect_uri="") - - response = check_http_get_response(client, url, status_code=200) - request = response.wsgi_request - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remoter used has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client): - """ - Check user gets authenticated when querying the web api - through an HTTP client using bearer token authentication. - """ - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=200) - request = response.wsgi_request - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remoter used has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client): - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # check for failed authentication but with expected token format - keycloak_mock.set_auth_success(False) - api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - # check for failed authentication when token format is invalid - api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") - - response = check_api_get_responses(api_client, url, status_code=400) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - -def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client): - url = reverse("api-1-stat-counters") - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # missing authorization type - api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - # invalid authorization type - api_client.credentials(HTTP_AUTHORIZATION="Foo token") - - response = check_api_get_responses(api_client, url, status_code=403) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py deleted file mode 100644 index 37cad77b..00000000 --- a/swh/web/tests/auth/test_backends.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from datetime import datetime, timedelta -from unittest.mock import Mock - -import pytest - -from django.conf import settings -from django.contrib.auth import authenticate, get_backends -from rest_framework.exceptions import AuthenticationFailed - -from swh.auth.django.models import OIDCUser -from swh.web.auth.backends import OIDCBearerTokenAuthentication -from swh.web.common.utils import reverse - - -def _authenticate_user(request_factory): - request = request_factory.get(reverse("oidc-login-complete")) - - return authenticate( - request=request, - code="some-code", - code_verifier="some-code-verifier", - redirect_uri="https://localhost:5004", - ) - - -def _check_authenticated_user(user, decoded_token, kc_oidc_mock): - assert user is not None - assert isinstance(user, OIDCUser) - assert user.id != 0 - assert user.username == decoded_token["preferred_username"] - assert user.password == "" - assert user.first_name == decoded_token["given_name"] - assert user.last_name == decoded_token["family_name"] - assert user.email == decoded_token["email"] - assert user.is_staff == ("/staff" in decoded_token["groups"]) - assert user.sub == decoded_token["sub"] - resource_access = decoded_token.get("resource_access", {}) - resource_access_client = resource_access.get(kc_oidc_mock, {}) - assert user.permissions == set(resource_access_client.get("roles", [])) - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory): - """ - Checks successful login based on OpenID Connect with PKCE extension - Django authentication backend (login from Web UI). - """ - keycloak_mock.user_groups = ["/staff"] - - oidc_profile = keycloak_mock.login() - user = _authenticate_user(request_factory) - - decoded_token = keycloak_mock.decode_token(user.access_token) - _check_authenticated_user(user, decoded_token, keycloak_mock) - - auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) - exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) - refresh_exp_datetime = auth_datetime + timedelta( - seconds=oidc_profile["refresh_expires_in"] - ) - - assert user.access_token == oidc_profile["access_token"] - assert user.expires_at == exp_datetime - assert user.id_token == oidc_profile["id_token"] - assert user.refresh_token == oidc_profile["refresh_token"] - assert user.refresh_expires_at == refresh_exp_datetime - assert user.scope == oidc_profile["scope"] - assert user.session_state == oidc_profile["session_state"] - - backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" - assert user.backend == backend_path - backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) - assert get_backends()[backend_idx].get_user(user.id) == user - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory): - """ - Checks failed login based on OpenID Connect with PKCE extension Django - authentication backend (login from Web UI). - """ - keycloak_mock.set_auth_success(False) - - user = _authenticate_user(request_factory) - - assert user is None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_success( - keycloak_mock, request_factory -): - """ - Checks access token renewal success using refresh token. - """ - - oidc_profile = keycloak_mock.login() - decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"]) - new_access_token = "new_access_token" - - def _refresh_token(refresh_token): - oidc_profile = dict(keycloak_mock.login()) - oidc_profile["access_token"] = new_access_token - return oidc_profile - - def _decode_token(access_token): - if access_token != new_access_token: - raise Exception("access token token has expired") - else: - return decoded_token - - keycloak_mock.decode_token = Mock() - keycloak_mock.decode_token.side_effect = _decode_token - keycloak_mock.refresh_token.side_effect = _refresh_token - - user = _authenticate_user(request_factory) - - oidc_profile = keycloak_mock.login() - keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) - - assert user is not None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_failure( - keycloak_mock, request_factory -): - """ - Checks access token renewal failure using refresh token. - """ - - def _refresh_token(refresh_token): - raise Exception("OIDC session has expired") - - def _decode_token(access_token): - raise Exception("access token token has expired") - - keycloak_mock.decode_token = Mock() - keycloak_mock.decode_token.side_effect = _decode_token - keycloak_mock.refresh_token.side_effect = _refresh_token - - user = _authenticate_user(request_factory) - - oidc_profile = keycloak_mock.login() - keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) - - assert user is None - - -@pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory): - """ - Checks that a permission defined with OpenID Connect is correctly mapped - to a Django one when logging from Web UI. - """ - permission = "webapp.some-permission" - keycloak_mock.user_permissions = [permission] - user = _authenticate_user(request_factory) - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} - assert user.has_module_perms("webapp") - assert not user.has_module_perms("foo") - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory): - """ - Checks successful login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login). - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - access_token = oidc_profile["access_token"] - - decoded_token = keycloak_mock.decode_token(access_token) - - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user, decoded_token, keycloak_mock) - # oidc_profile is not filled when authenticating through bearer token - assert hasattr(user, "access_token") and user.access_token is None - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory): - """ - Checks failed login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login). - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - - # simulate a failed authentication with a bearer token in expected format - keycloak_mock.set_auth_success(False) - - refresh_token = oidc_profile["refresh_token"] - - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - # simulate a failed authentication with an invalid bearer token format - request = api_request_factory.get( - url, HTTP_AUTHORIZATION="Bearer invalid-token-format" - ) - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - -def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory): - """ - Checks failed login based on OpenID Connect bearer token Django REST - Framework authentication backend (Web API login) due to invalid - authorization header value. - """ - url = reverse("api-1-stat-counters") - drf_auth_backend = OIDCBearerTokenAuthentication() - - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - - # Invalid authorization type - request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - # Missing authorization type - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") - - with pytest.raises(AuthenticationFailed): - drf_auth_backend.authenticate(request) - - -@pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_permissions( - keycloak_mock, api_request_factory -): - """ - Checks that a permission defined with OpenID Connect is correctly mapped - to a Django one when using bearer token authentication. - """ - permission = "webapp.some-permission" - keycloak_mock.user_permissions = [permission] - - drf_auth_backend = OIDCBearerTokenAuthentication() - oidc_profile = keycloak_mock.login() - refresh_token = oidc_profile["refresh_token"] - url = reverse("api-1-stat-counters") - request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") - user, _ = drf_auth_backend.authenticate(request) - - assert user.has_perm(permission) - assert user.get_all_permissions() == {permission} - assert user.get_group_permissions() == {permission} - assert user.has_module_perms("webapp") - assert not user.has_module_perms("foo") diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py deleted file mode 100644 index f6698143..00000000 --- a/swh/web/tests/auth/test_middlewares.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -import pytest - -from django.core.cache import cache -from django.test import modify_settings - -from swh.web.common.utils import reverse -from swh.web.tests.utils import check_html_get_response - - -@pytest.mark.django_db -@modify_settings( - MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} -) -def test_oidc_session_expired_middleware_disabled(client, keycloak_mock): - # authenticate user - - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - url = reverse("swh-web-homepage") - - # visit url first to get user from response - response = check_html_get_response(client, url, status_code=200) - - # simulate OIDC session expiration - cache.delete(f"oidc_user_{response.wsgi_request.user.id}") - - # no redirection when session has expired - check_html_get_response(client, url, status_code=200) - - -@pytest.mark.django_db -def test_oidc_session_expired_middleware_enabled(client, keycloak_mock): - # authenticate user - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - url = reverse("swh-web-homepage") - - # visit url first to get user from response - response = check_html_get_response(client, url, status_code=200) - - # simulate OIDC session expiration - cache.delete(f"oidc_user_{response.wsgi_request.user.id}") - - # should redirect to logout page - resp = check_html_get_response(client, url, status_code=302) - silent_refresh_url = reverse( - "logout", query_params={"next_path": url, "remote_user": 1} - ) - assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py index 5d9bedf6..7d78c4ad 100644 --- a/swh/web/tests/auth/test_utils.py +++ b/swh/web/tests/auth/test_utils.py @@ -1,65 +1,36 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from base64 import urlsafe_b64encode -import hashlib -import re from cryptography.fernet import InvalidToken import pytest -from swh.web.auth.utils import decrypt_data, encrypt_data, gen_oidc_pkce_codes - - -def test_gen_oidc_pkce_codes(): - """ - Check generated PKCE codes respect the specification - (see https://tools.ietf.org/html/rfc7636#section-4.1) - """ - code_verifier, code_challenge = gen_oidc_pkce_codes() - - # check the code verifier only contains allowed characters - assert re.match(r"[a-zA-Z0-9-\._~]+", code_verifier) - - # check minimum and maximum authorized length for the - # code verifier - assert len(code_verifier) >= 43 - assert len(code_verifier) <= 128 - - # compute code challenge from code verifier - challenge = hashlib.sha256(code_verifier.encode("ascii")).digest() - challenge = urlsafe_b64encode(challenge).decode("ascii") - challenge = challenge.replace("=", "") - - # check base64 padding is not present - assert not code_challenge[-1].endswith("=") - # check code challenge is valid - assert code_challenge == challenge +from swh.web.auth.utils import decrypt_data, encrypt_data def test_encrypt_decrypt_data_ok(): data = b"some-data-to-encrypt" password = b"secret" salt = b"salt-value" encrypted_data = encrypt_data(data, password, salt) decrypted_data = decrypt_data(encrypted_data, password, salt) assert decrypted_data == data def test_encrypt_decrypt_data_ko(): data = b"some-data-to-encrypt" password1 = b"secret" salt1 = b"salt-value" password2 = b"secret2" salt2 = b"salt-value2" encrypted_data = encrypt_data(data, password1, salt1) for password, salt in ((password2, salt2), (password1, salt2), (password2, salt1)): with pytest.raises(InvalidToken): decrypt_data(encrypted_data, password2, salt2) diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 2cc37dda..07436e12 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,501 +1,272 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest -from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict -from swh.auth.django.models import OIDCUser from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view def _check_oidc_login_code_flow_data( - request, response, kc_oidc_mock, redirect_uri, scope="openid" + request, response, keycloak_oidc, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) - authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] + authorization_url = keycloak_oidc.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data -@pytest.mark.django_db -def test_oidc_login_views_success(client, keycloak_mock): - """ - Simulate a successful login authentication with OpenID Connect - authorization code flow with PKCE. - """ - # user initiates login process - login_url = reverse("oidc-login") - - # should redirect to Keycloak authentication page in order - # for a user to login with its username / password - response = check_html_get_response(client, login_url, status_code=302) - request = response.wsgi_request - - assert isinstance(request.user, AnonymousUser) - - login_data = _check_oidc_login_code_flow_data( - request, - response, - keycloak_mock, - redirect_uri=reverse("oidc-login-complete", request=request), - ) - - # once a user has identified himself in Keycloak, he is - # redirected to the 'oidc-login-complete' view to - # login in Django. - - # generate authorization code / session state in the same - # manner as Keycloak - code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" - session_state = str(uuid.uuid4()) - - login_complete_url = reverse( - "oidc-login-complete", - query_params={ - "code": code, - "state": login_data["state"], - "session_state": session_state, - }, - ) - - # login process finalization, should redirect to root url by default - response = check_html_get_response(client, login_complete_url, status_code=302) - request = response.wsgi_request - - assert response["location"] == request.build_absolute_uri("/") - - # user should be authenticated - assert isinstance(request.user, OIDCUser) - - # check remote user has not been saved to Django database - with pytest.raises(User.DoesNotExist): - User.objects.get(username=request.user.username) - - -@pytest.mark.django_db -def test_oidc_logout_view_success(client, keycloak_mock): - """ - Simulate a successful logout operation with OpenID Connect. - """ - # login our test user - client.login(code="", code_verifier="", redirect_uri="") - keycloak_mock.authorization_code.assert_called() - - # user initiates logout - oidc_logout_url = reverse("oidc-logout") - - # should redirect to logout page - response = check_html_get_response(client, oidc_logout_url, status_code=302) - request = response.wsgi_request - - logout_url = reverse("logout", query_params={"remote_user": 1}) - assert response["location"] == request.build_absolute_uri(logout_url) - - # should have been logged out in Keycloak - oidc_profile = keycloak_mock.login() - keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"]) - - # check effective logout in Django - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_login_view_failure(client, keycloak_mock): - """ - Simulate a failed authentication with OpenID Connect. - """ - keycloak_mock.set_auth_success(False) - - # user initiates login process - login_url = reverse("oidc-login") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - - # no users should be logged in - assert isinstance(request.user, AnonymousUser) - - -# Simulate possible errors with OpenID Connect in the login complete view. - - -def test_oidc_login_complete_view_no_login_data(client, mocker): - # user initiates login process - login_url = reverse("oidc-login-complete") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - - assert_contains( - response, "Login process has not been initialized.", status_code=500 - ) - - -def test_oidc_login_complete_view_missing_parameters(client, mocker): - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # user initiates login process - login_url = reverse("oidc-login-complete") - # should render an error page - response = check_html_get_response( - client, login_url, status_code=400, template_used="error.html" - ) - request = response.wsgi_request - assert_contains( - response, "Missing query parameters for authentication.", status_code=400 - ) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock): - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # user initiates login process - login_url = reverse( - "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} - ) - - # should render an error page - response = check_html_get_response( - client, login_url, status_code=400, template_used="error.html" - ) - request = response.wsgi_request - assert_contains( - response, "Wrong CSRF token, aborting login process.", status_code=400 - ) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock): - keycloak_mock.set_auth_success(False) - - # simulate login process has been initialized - session = client.session - session["login_data"] = { - "code_verifier": "", - "state": str(uuid.uuid4()), - "redirect_uri": "", - "next_path": "", - } - session.save() - - # check authentication error is reported - login_url = reverse( - "oidc-login-complete", - query_params={"code": "some-code", "state": session["login_data"]["state"]}, - ) - - # should render an error page - response = check_html_get_response( - client, login_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - assert_contains(response, "User authentication failed.", status_code=500) - - # no user should be logged in - assert isinstance(request.user, AnonymousUser) - - -@pytest.mark.django_db -def test_oidc_logout_view_failure(client, keycloak_mock): - """ - Simulate a failed logout operation with OpenID Connect. - """ - # login our test user - client.login(code="", code_verifier="", redirect_uri="") - - err_msg = "Authentication server error" - keycloak_mock.logout.side_effect = Exception(err_msg) - - # user initiates logout process - logout_url = reverse("oidc-logout") - # should render an error page - response = check_html_get_response( - client, logout_url, status_code=500, template_used="error.html" - ) - request = response.wsgi_request - assert_contains(response, err_msg, status_code=500) - - # user should be logged out from Django anyway - assert isinstance(request.user, AnonymousUser) - - def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) - response = check_html_get_response(client, token_complete_url, status_code=302) + response = check_http_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock): +def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_oidc): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_list_bearer_tokens(client, keycloak_mock): +def test_oidc_list_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to list his tokens. """ nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_get_bearer_token(client, keycloak_mock): +def test_oidc_get_bearer_token(client, keycloak_oidc): """ User with correct credentials should be allowed to display a token. """ nb_tokens = 3 for i in range(nb_tokens): - token = _generate_and_test_bearer_token(client, keycloak_mock) + token = _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_revoke_bearer_tokens(client, keycloak_mock): +def test_oidc_revoke_bearer_tokens(client, keycloak_oidc): """ User with correct credentials should be allowed to revoke tokens. """ nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, keycloak_mock) + _generate_and_test_bearer_token(client, keycloak_oidc) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) - resp = check_html_get_response(client, url, status_code=302) + resp = check_http_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db -def test_oidc_profile_view(client, keycloak_mock): +def test_oidc_profile_view(client, keycloak_oidc): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] - keycloak_mock.user_permissions = user_permissions + keycloak_oidc.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm) diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index b84632d9..0bb1510e 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,382 +1,376 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil from subprocess import PIPE, run import sys from typing import Any, Dict, List, Optional from hypothesis import HealthCheck from hypothesis import __version_info__ as hypothesis_version from hypothesis import settings import pytest from django.core.cache import cache from rest_framework.test import APIClient, APIRequestFactory -from swh.auth.pytest_plugin import keycloak_mock_factory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.typing import OriginVisitInfo from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages +pytest_plugins = ["swh.auth.pytest_plugin"] + # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) suppress_health_check = [HealthCheck.too_slow, HealthCheck.filter_too_much] if hypothesis_version >= (5, 49): suppress_health_check.append(HealthCheck.function_scoped_fixture) settings.register_profile( "swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=1, suppress_health_check=suppress_health_check, ), ) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox static_dir = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../assets/src/bundles") _, dirs, _ = next(os.walk(bundles_dir)) mock_webpack_stats = {"status": "done", "publicPath": "/static", "chunks": {}} for bundle in dirs: asset = "js/%s.js" % bundle mock_webpack_stats["chunks"][bundle] = [ { "name": asset, "publicPath": "/static/%s" % asset, "path": os.path.join(static_dir, asset), } ] with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture(scope="module") def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture(scope="module") def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="session", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages(data["storage"], data["idx_storage"], data["search"]) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="session") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="session") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)]) def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) -_keycloak_config = get_config()["keycloak"] - -_keycloak_mock = keycloak_mock_factory( - server_url=_keycloak_config["server_url"], - realm_name=_keycloak_config["realm_name"], - client_id=OIDC_SWH_WEB_CLIENT_ID, -) +@pytest.fixture +def keycloak_oidc(keycloak_oidc, mocker): + keycloak_config = get_config()["keycloak"] + keycloak_oidc.server_url = keycloak_config["server_url"] + keycloak_oidc.realm_name = keycloak_config["realm_name"] + keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID -@pytest.fixture -def keycloak_mock(_keycloak_mock, mocker): - for oidc_client_factory in ( - "swh.web.auth.views.get_oidc_client", - "swh.web.auth.backends.get_oidc_client", - ): - mock_get_oidc_client = mocker.patch(oidc_client_factory) - mock_get_oidc_client.return_value = _keycloak_mock + keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") + keycloak_oidc_client.return_value = keycloak_oidc - return _keycloak_mock + return keycloak_oidc