diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 4f99c7ad..f0022893 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,190 +1,190 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta import hashlib from typing import Any, Dict, Optional import sentry_sdk from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed, ValidationError -from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.models import OIDCUser from swh.web.auth.utils import get_oidc_client -# OpenID Connect client to communicate with Keycloak server -_oidc_client: KeycloakOpenIDConnect = get_oidc_client() - def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int("".join(decoded_token["sub"].split("-")), 16) # create a Django user that will not be saved to database user = OIDCUser( id=user_id, username=decoded_token["preferred_username"], password="", first_name=decoded_token["given_name"], last_name=decoded_token["family_name"], email=decoded_token["email"], ) # set is_staff user property based on groups if "groups" in decoded_token: user.is_staff = "/staff" in decoded_token["groups"] # extract user permissions if any resource_access = decoded_token.get("resource_access", {}) - client_resource_access = resource_access.get(_oidc_client.client_id, {}) + client_resource_access = resource_access.get(get_oidc_client().client_id, {}) user.permissions = set(client_resource_access.get("roles", [])) # add user sub to custom User proxy model user.sub = decoded_token["sub"] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: + oidc_client = get_oidc_client() + # decode JWT token try: access_token = oidc_profile["access_token"] - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) # access token has expired or is invalid except Exception: # get a new access token from authentication provider - oidc_profile = _oidc_client.refresh_token(oidc_profile["refresh_token"]) + oidc_profile = oidc_client.refresh_token(oidc_profile["refresh_token"]) # decode access token - decoded_token = _oidc_client.decode_token(oidc_profile["access_token"]) + decoded_token = oidc_client.decode_token(oidc_profile["access_token"]) # create OIDCUser from decoded token user = _oidc_user_from_decoded_token(decoded_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) # compute OIDC tokens expiration date oidc_profile["expires_at"] = exp_datetime oidc_profile["refresh_expires_at"] = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) # put OIDC profile in cache or update it after token renewal cache_key = f"oidc_user_{user.id}" if cache.get(cache_key) is None or access_token != oidc_profile["access_token"]: # set cache key TTL as refresh token expiration time assert user.refresh_expires_at ttl = int(user.refresh_expires_at.timestamp() - timezone.now().timestamp()) # save oidc_profile in cache cache.set(cache_key, oidc_profile, timeout=max(0, ttl)) return user class OIDCAuthorizationCodePKCEBackend: def authenticate( self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str ) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow - oidc_profile = _oidc_client.authorization_code( + oidc_profile = get_oidc_client().authorization_code( code, redirect_uri, code_verifier=code_verifier ) # create Django user user = _oidc_user_from_profile(oidc_profile) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get oidc profile from cache oidc_profile = cache.get(f"oidc_user_{user_id}") if oidc_profile: try: user = _oidc_user_from_profile(oidc_profile) # restore auth backend setattr(user, "backend", f"{__name__}.{self.__class__.__name__}") return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get("HTTP_AUTHORIZATION") if auth_header is None: return None try: auth_type, refresh_token = auth_header.split(" ", 1) except ValueError: raise AuthenticationFailed("Invalid HTTP authorization header format") if auth_type != "Bearer": raise AuthenticationFailed( (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).") ) try: + oidc_client = get_oidc_client() + # compute a cache key from the token that does not exceed # memcached key size limit hasher = hashlib.sha1() hasher.update(refresh_token.encode("ascii")) cache_key = f"api_token_{hasher.hexdigest()}" # check if an access token is cached access_token = cache.get(cache_key) # attempt to decode access token try: - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) except Exception: # access token is None or it has expired decoded_token = None if access_token is None or decoded_token is None: # get a new access token from authentication provider - access_token = _oidc_client.refresh_token(refresh_token)["access_token"] + access_token = oidc_client.refresh_token(refresh_token)["access_token"] # decode access token - decoded_token = _oidc_client.decode_token(access_token) + decoded_token = oidc_client.decode_token(access_token) # compute access token expiration exp = datetime.fromtimestamp(decoded_token["exp"]) ttl = int(exp.timestamp() - timezone.now().timestamp()) # save access token in cache while it is valid cache.set(cache_key, access_token, timeout=max(0, ttl)) # create Django user user = _oidc_user_from_decoded_token(decoded_token) except UnicodeEncodeError as e: sentry_sdk.capture_exception(e) raise ValidationError("Invalid bearer token") except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py deleted file mode 100644 index e457a6cc..00000000 --- a/swh/web/auth/keycloak.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Any, Dict, Optional, Tuple -from urllib.parse import urlencode - -from keycloak import KeycloakOpenID - - -class KeycloakOpenIDConnect: - """ - Wrapper class around python-keycloak to ease the interaction with Keycloak - for managing authentication and user permissions with OpenID Connect. - """ - - def __init__( - self, - server_url: str, - realm_name: str, - client_id: str, - realm_public_key: str = "", - ): - """ - Args: - server_url: URL of the Keycloak server - realm_name: The realm name - client_id: The OpenID Connect client identifier - realm_public_key: The realm public key (will be dynamically - retrieved if not provided) - """ - self._keycloak = KeycloakOpenID( - server_url=server_url, client_id=client_id, realm_name=realm_name, - ) - - self.server_url = server_url - self.realm_name = realm_name - self.client_id = client_id - self.realm_public_key = realm_public_key - - def well_known(self) -> Dict[str, Any]: - """ - Retrieve the OpenID Connect Well-Known URI registry from Keycloak. - - Returns: - A dictionary filled with OpenID Connect URIS. - """ - return self._keycloak.well_know() - - def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: - """ - Get OpenID Connect authorization URL to authenticate users. - - Args: - redirect_uri: URI to redirect to once a user is authenticated - extra_params: Extra query parameters to add to the - authorization URL - """ - auth_url = self._keycloak.auth_url(redirect_uri) - if extra_params: - auth_url += "&%s" % urlencode(extra_params) - return auth_url - - def authorization_code( - self, code: str, redirect_uri: str, **extra_params: str - ) -> Dict[str, Any]: - """ - Get OpenID Connect authentication tokens using Authorization - Code flow. - - Args: - code: Authorization code provided by Keycloak - redirect_uri: URI to redirect to once a user is authenticated - (must be the same as the one provided to authorization_url): - extra_params: Extra parameters to add in the authorization request - payload. - """ - return self._keycloak.token( - grant_type="authorization_code", - code=code, - redirect_uri=redirect_uri, - **extra_params, - ) - - def refresh_token(self, refresh_token: str) -> Dict[str, Any]: - """ - Request a new access token from Keycloak using a refresh token. - - Args: - refresh_token: A refresh token provided by Keycloak - - Returns: - A dictionary filled with tokens info - """ - return self._keycloak.refresh_token(refresh_token) - - def decode_token( - self, token: str, options: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: - """ - Try to decode a JWT token. - - Args: - token: A JWT token to decode - options: Options for jose.jwt.decode - - Returns: - A dictionary filled with decoded token content - """ - if not self.realm_public_key: - realm_public_key = self._keycloak.public_key() - self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" - self.realm_public_key += realm_public_key - self.realm_public_key += "\n-----END PUBLIC KEY-----" - - return self._keycloak.decode_token( - token, key=self.realm_public_key, options=options - ) - - def logout(self, refresh_token: str) -> None: - """ - Logout a user by closing its authenticated session. - - Args: - refresh_token: A refresh token provided by Keycloak - """ - self._keycloak.logout(refresh_token) - - def userinfo(self, access_token: str) -> Dict[str, Any]: - """ - Return user information from its access token. - - Args: - access_token: An access token provided by Keycloak - - Returns: - A dictionary fillled with user information - """ - return self._keycloak.userinfo(access_token) - - -# stores instances of KeycloakOpenIDConnect class -# dict keys are (realm_name, client_id) tuples -_keycloak_oidc: Dict[Tuple[str, str], KeycloakOpenIDConnect] = {} - - -def get_keycloak_oidc_client( - server_url: str, realm_name: str, client_id: str -) -> KeycloakOpenIDConnect: - """ - Instantiate a KeycloakOpenIDConnect class for a given client in a - given realm. - - Args: - server_url: Base URL of a Keycloak server - realm_name: Name of the realm in Keycloak - client_id: Client identifier in the realm - - Returns: - An object to ease the interaction with the Keycloak server - """ - realm_client_key = (realm_name, client_id) - if realm_client_key not in _keycloak_oidc: - _keycloak_oidc[realm_client_key] = KeycloakOpenIDConnect( - server_url, realm_name, client_id - ) - return _keycloak_oidc[realm_client_key] diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py index 54c6dcbb..87f12fb1 100644 --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -1,122 +1,128 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import urlsafe_b64encode import hashlib import secrets -from typing import Tuple +from typing import Dict, Tuple from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from swh.web.auth.keycloak import KeycloakOpenIDConnect, get_keycloak_oidc_client +from swh.auth.keycloak import KeycloakOpenIDConnect from swh.web.config import get_config def gen_oidc_pkce_codes() -> Tuple[str, str]: """ Generates a code verifier and a code challenge to be used with the OpenID Connect authorization code flow with PKCE ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636). PKCE replaces the static secret used in the standard authorization code flow with a temporary one-time challenge, making it feasible to use in public clients. The implementation is inspired from that blog post: https://www.stefaanlippens.net/oauth-code-flow-pkce.html """ # generate a code verifier which is a long enough random alphanumeric # string, only to be used "client side" code_verifier_str = secrets.token_urlsafe(60) # create the PKCE code challenge by hashing the code verifier with SHA256 # and encoding the result in URL-safe base64 (without padding) code_challenge = hashlib.sha256(code_verifier_str.encode("ascii")).digest() code_challenge_str = urlsafe_b64encode(code_challenge).decode("ascii") code_challenge_str = code_challenge_str.replace("=", "") return code_verifier_str, code_challenge_str OIDC_SWH_WEB_CLIENT_ID = "swh-web" def _get_fernet(password: bytes, salt: bytes) -> Fernet: """ Instantiate a Fernet system from a password and a salt value (see https://cryptography.io/en/latest/fernet/). Args: password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The Fernet system """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend(), ) key = urlsafe_b64encode(kdf.derive(password)) return Fernet(key) def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Encrypt data using Fernet system (symmetric encryption). Args: data: input data to encrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The encrypted data """ return _get_fernet(password, salt).encrypt(data) def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Decrypt data using Fernet system (symmetric encryption). Args: data: input data to decrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The decrypted data """ return _get_fernet(password, salt).decrypt(data) +# stores instances of KeycloakOpenIDConnect class +# dict keys are (realm_name, client_id) tuples +_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {} + + def get_oidc_client(client_id: str = OIDC_SWH_WEB_CLIENT_ID) -> KeycloakOpenIDConnect: """ Instantiate a KeycloakOpenIDConnect class for a given client in the SoftwareHeritage realm. Args: client_id: client identifier in the SoftwareHeritage realm Returns: An object to ease the interaction with the Keycloak server """ - swhweb_config = get_config() - return get_keycloak_oidc_client( - swhweb_config["keycloak"]["server_url"], - swhweb_config["keycloak"]["realm_name"], - client_id, - ) + keycloak_config = get_config()["keycloak"] + + if client_id not in _keycloak_oidc: + _keycloak_oidc[client_id] = KeycloakOpenIDConnect( + keycloak_config["server_url"], keycloak_config["realm_name"], client_id + ) + return _keycloak_oidc[client_id] diff --git a/swh/web/tests/api/views/test_graph.py b/swh/web/tests/api/views/test_graph.py index a39b3c84..7b4a65f4 100644 --- a/swh/web/tests/api/views/test_graph.py +++ b/swh/web/tests/api/views/test_graph.py @@ -1,262 +1,261 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import json import textwrap from hypothesis import given from django.http.response import StreamingHttpResponse from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID from swh.web.api.views.graph import API_GRAPH_PERM from swh.web.common.utils import reverse from swh.web.config import SWH_WEB_INTERNAL_SERVER_NAME, get_config -from swh.web.tests.auth.keycloak_mock import mock_keycloak -from swh.web.tests.auth.sample_data import oidc_profile from swh.web.tests.strategies import origin from swh.web.tests.utils import check_http_get_response def test_graph_endpoint_no_authentication_for_vpn_users(api_client, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response( api_client, url, status_code=200, server_name=SWH_WEB_INTERNAL_SERVER_NAME ) def test_graph_endpoint_needs_authentication(api_client): url = reverse("api-1-graph", url_args={"graph_query": "stats"}) check_http_get_response(api_client, url, status_code=401) -def _authenticate_graph_user(api_client, mocker): - mock_keycloak(mocker, user_permissions=[API_GRAPH_PERM]) +def _authenticate_graph_user(api_client, keycloak_mock): + keycloak_mock.user_permissions = [API_GRAPH_PERM] + oidc_profile = keycloak_mock.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") -def test_graph_endpoint_needs_permission(api_client, mocker, requests_mock): +def test_graph_endpoint_needs_permission(api_client, keycloak_mock, requests_mock): graph_query = "stats" url = reverse("api-1-graph", url_args={"graph_query": graph_query}) + oidc_profile = keycloak_mock.login() api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") - mock_keycloak(mocker, user_permissions=[]) check_http_get_response(api_client, url, status_code=403) - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json={}, headers={"Content-Type": "application/json"}, ) check_http_get_response(api_client, url, status_code=200) -def test_graph_text_plain_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_text_plain_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323" response_text = textwrap.dedent( """\ swh:1:cnt:1d3dace0a825b0535c37c53ed669ef817e9c1b47 swh:1:cnt:6d5b280f4e33589ae967a7912a587dd5cb8dedaa swh:1:cnt:91bef238bf01356a550d416d14bb464c576ac6f4 swh:1:cnt:58a8b925a463b87d49639fda282b8f836546e396 swh:1:cnt:fd32ee0a87e16ccc853dfbeb7018674f9ce008c0 swh:1:cnt:ab7c39871872589a4fc9e249ebc927fb1042c90d swh:1:cnt:93073c02bf3869845977527de16af4d54765838d swh:1:cnt:4251f795b52c54c447a97c9fe904d8b1f993b1e0 swh:1:cnt:c6e7055424332006d07876ffeba684e7e284b383 swh:1:cnt:8459d8867dc3b15ef7ae9683e21cccc9ab2ec887 swh:1:cnt:5f9981d52202815aa947f85b9dfa191b66f51138 swh:1:cnt:00a685ec51bcdf398c15d588ecdedb611dbbab4b swh:1:cnt:e1cf1ea335106a0197a2f92f7804046425a7d3eb swh:1:cnt:07069b38087f88ec192d2c9aff75a502476fd17d swh:1:cnt:f045ee845c7f14d903a2c035b2691a7c400c01f0 """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": "text/plain", "Transfer-Encoding": "chunked"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response( api_client, url, status_code=200, content_type="text/plain" ) assert isinstance(resp, StreamingHttpResponse) assert b"".join(resp.streaming_content) == response_text.encode() _response_json = { "counts": {"nodes": 17075708289, "edges": 196236587976}, "ratios": { "compression": 0.16, "bits_per_node": 58.828, "bits_per_edge": 5.119, "avg_locality": 2184278529.729, }, "indegree": {"min": 0, "max": 263180117, "avg": 11.4921492364925}, "outdegree": {"min": 0, "max": 1033207, "avg": 11.4921492364925}, } -def test_graph_json_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_json_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() -def test_graph_ndjson_response(api_client, mocker, requests_mock): - _authenticate_graph_user(api_client, mocker) +def test_graph_ndjson_response(api_client, keycloak_mock, requests_mock): + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb" response_ndjson = textwrap.dedent( """\ ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb",\ "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] """ ) requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_ndjson, headers={ "Content-Type": "application/x-ndjson", "Transfer-Encoding": "chunked", }, ) url = reverse("api-1-graph", url_args={"graph_query": graph_query}) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == "application/x-ndjson" assert b"".join(resp.streaming_content) == response_ndjson.encode() @given(origin()) def test_graph_response_resolve_origins( - archive_data, api_client, mocker, requests_mock, origin + archive_data, api_client, keycloak_mock, requests_mock, origin ): hasher = hashlib.sha1() hasher.update(origin["url"].encode()) origin_sha1 = hasher.digest() origin_swhid = str( ExtendedSWHID(object_type=ExtendedObjectType.ORIGIN, object_id=origin_sha1) ) snapshot = archive_data.snapshot_get_latest(origin["url"])["id"] snapshot_swhid = str( ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=hash_to_bytes(snapshot) ) ) - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) for graph_query, response_text, content_type in ( ( f"visit/nodes/{snapshot_swhid}", f"{snapshot_swhid}\n{origin_swhid}\n", "text/plain", ), ( f"visit/edges/{snapshot_swhid}", f"{snapshot_swhid} {origin_swhid}\n", "text/plain", ), ( f"visit/paths/{snapshot_swhid}", f'["{snapshot_swhid}", "{origin_swhid}"]\n', "application/x-ndjson", ), ): # set two lines response to check resolved origins cache response_text = response_text + response_text requests_mock.get( get_config()["graph"]["server_url"] + graph_query, text=response_text, headers={"Content-Type": content_type, "Transfer-Encoding": "chunked"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert b"".join(resp.streaming_content) == response_text.encode() url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"direction": "backward", "resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert isinstance(resp, StreamingHttpResponse) assert resp["Content-Type"] == content_type assert ( b"".join(resp.streaming_content) == response_text.replace(origin_swhid, origin["url"]).encode() ) def test_graph_response_resolve_origins_nothing_to_do( - api_client, mocker, requests_mock + api_client, keycloak_mock, requests_mock ): - _authenticate_graph_user(api_client, mocker) + _authenticate_graph_user(api_client, keycloak_mock) graph_query = "stats" requests_mock.get( get_config()["graph"]["server_url"] + graph_query, json=_response_json, headers={"Content-Type": "application/json"}, ) url = reverse( "api-1-graph", url_args={"graph_query": graph_query}, query_params={"resolve_origins": "true"}, ) resp = check_http_get_response(api_client, url, status_code=200) assert resp.content_type == "application/json" assert resp.content == json.dumps(_response_json).encode() diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py deleted file mode 100644 index 7e61b061..00000000 --- a/swh/web/tests/auth/keycloak_mock.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from copy import copy -from unittest.mock import Mock - -from keycloak.exceptions import KeycloakError - -from django.utils import timezone - -from swh.web.auth.keycloak import KeycloakOpenIDConnect -from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID -from swh.web.config import get_config - -from .sample_data import oidc_profile, realm_public_key, userinfo - - -class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): - def __init__( - self, auth_success=True, exp=None, user_groups=[], user_permissions=[] - ): - swhweb_config = get_config() - super().__init__( - swhweb_config["keycloak"]["server_url"], - swhweb_config["keycloak"]["realm_name"], - OIDC_SWH_WEB_CLIENT_ID, - ) - self.auth_success = auth_success - self.exp = exp - self.user_groups = user_groups - self.user_permissions = user_permissions - self._keycloak.public_key = lambda: realm_public_key - self._keycloak.well_know = lambda: { - "issuer": f"{self.server_url}realms/{self.realm_name}", - "authorization_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/auth" - ), - "token_endpoint": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/token" - ), - "token_introspection_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/token/" - "introspect" - ), - "userinfo_endpoint": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/userinfo" - ), - "end_session_endpoint": ( - f"{self.server_url}realms/" - f"{self.realm_name}/protocol/" - "openid-connect/logout" - ), - "jwks_uri": ( - f"{self.server_url}realms/{self.realm_name}/" - "protocol/openid-connect/certs" - ), - } - self.authorization_code = Mock() - self.refresh_token = Mock() - self.userinfo = Mock() - self.logout = Mock() - if auth_success: - self.authorization_code.return_value = copy(oidc_profile) - self.refresh_token.return_value = copy(oidc_profile) - self.userinfo.return_value = copy(userinfo) - else: - self.authorization_url = Mock() - exception = KeycloakError( - error_message="Authentication failed", response_code=401 - ) - self.authorization_code.side_effect = exception - self.authorization_url.side_effect = exception - self.refresh_token.side_effect = exception - self.userinfo.side_effect = exception - self.logout.side_effect = exception - - def decode_token(self, token): - options = {} - if self.auth_success: - # skip signature expiration check as we use a static oidc_profile - # for the tests with expired tokens in it - options["verify_exp"] = False - decoded = super().decode_token(token, options) - # tweak auth and exp time for tests - expire_in = decoded["exp"] - decoded["iat"] - if self.exp is not None: - decoded["exp"] = self.exp - decoded["iat"] = self.exp - expire_in - else: - decoded["iat"] = int(timezone.now().timestamp()) - decoded["exp"] = decoded["iat"] + expire_in - decoded["groups"] = self.user_groups - if self.user_permissions: - decoded["resource_access"][self.client_id] = { - "roles": self.user_permissions - } - return decoded - - -def mock_keycloak( - mocker, auth_success=True, exp=None, user_groups=[], user_permissions=[] -): - kc_oidc_mock = KeycloackOpenIDConnectMock( - auth_success, exp, user_groups, user_permissions - ) - mock_get_oidc_client = mocker.patch("swh.web.auth.views.get_oidc_client") - mock_get_oidc_client.return_value = kc_oidc_mock - mocker.patch("swh.web.auth.backends._oidc_client", kc_oidc_mock) - return kc_oidc_mock diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py deleted file mode 100644 index edccc9af..00000000 --- a/swh/web/tests/auth/sample_data.py +++ /dev/null @@ -1,128 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU Affero General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -realm_public_key = ( - "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u" - "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY" - "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy" - "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr" - "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie" - "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB" -) - -oidc_profile = { - "access_token": ( - # decoded token: - # {'acr': '1', - # 'allowed-origins': ['*'], - # 'aud': ['swh-web', 'account'], - # 'auth_time': 1582723101, - # 'azp': 'swh-web', - # 'email': 'john.doe@example.com', - # 'email_verified': False, - # 'exp': 1592396202, - # 'family_name': 'Doe', - # 'given_name': 'John', - # 'groups': ['/staff'], - # 'iat': 1592395601, - # 'iss': 'http://localhost:8080/auth/realms/SoftwareHeritage', - # 'jti': '31fc50b7-bbe5-4f51-91ef-8e3eec51331e', - # 'name': 'John Doe', - # 'nbf': 0, - # 'preferred_username': 'johndoe', - # 'realm_access': {'roles': ['offline_access', 'uma_authorization']}, - # 'resource_access': {'account': {'roles': ['manage-account', - # 'manage-account-links', - # 'view-profile']}}, - # 'scope': 'openid email profile', - # 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d', - # 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200', - # 'typ': 'Bearer' - # } - "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV" - "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0." - "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz" - "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz" - "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs" - "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj" - "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2" - "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi" - "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy" - "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi" - "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6" - "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0" - "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl" - "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz" - "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg" - "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv" - "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi" - "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi" - "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-" - "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB" - "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO" - "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc" - "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl" - "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE" - "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ" - "6A" - ), - "expires_in": 600, - "id_token": ( - "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0" - "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki" - "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi" - "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo" - "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp" - "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh" - "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13" - "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk" - "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx" - "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn" - "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2" - "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi" - "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee" - "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL" - "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_" - "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE" - "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN" - "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX" - "ZbYnitD1Typ6Q" - ), - "not-before-policy": 0, - "refresh_expires_in": 1800, - "refresh_token": ( - "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM" - "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk" - "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC" - "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL" - "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv" - "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q" - "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj" - "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid" - "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi" - "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ" - "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl" - "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic" - "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu" - "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc" - "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG" - "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI" - ), - "scope": "openid email profile", - "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", - "token_type": "bearer", -} - -userinfo = { - "email": "john.doe@example.com", - "email_verified": False, - "family_name": "Doe", - "given_name": "John", - "groups": ["/staff"], - "name": "John Doe", - "preferred_username": "johndoe", - "sub": "feacd344-b468-4a65-a236-14f61e6b7200", -} diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py index 8f800372..f65c17f1 100644 --- a/swh/web/tests/auth/test_api_auth.py +++ b/swh/web/tests/auth/test_api_auth.py @@ -1,107 +1,105 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.contrib.auth.models import AnonymousUser, User from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from swh.web.tests.utils import check_api_get_responses, check_http_get_response -from . import sample_data -from .keycloak_mock import mock_keycloak - @pytest.mark.django_db -def test_drf_django_session_auth_success(mocker, client): +def test_drf_django_session_auth_success(keycloak_mock, client): """ Check user gets authenticated when querying the web api through a web browser. """ url = reverse("api-1-stat-counters") - mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") response = check_http_get_response(client, url, status_code=200) request = response.wsgi_request # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_success(mocker, api_client): +def test_drf_oidc_bearer_token_auth_success(keycloak_mock, api_client): """ Check user gets authenticated when querying the web api through an HTTP client using bearer token authentication. """ url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] - mock_keycloak(mocker) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=200) request = response.wsgi_request # user should be authenticated assert isinstance(request.user, OIDCUser) # check remoter used has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_failure(mocker, api_client): +def test_drf_oidc_bearer_token_auth_failure(keycloak_mock, api_client): url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # check for failed authentication but with expected token format - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) # check for failed authentication when token format is invalid api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà") response = check_api_get_responses(api_client, url, status_code=400) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) -def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client): +def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_mock, api_client): url = reverse("api-1-stat-counters") - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # missing authorization type api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) # invalid authorization type api_client.credentials(HTTP_AUTHORIZATION="Foo token") response = check_api_get_responses(api_client, url, status_code=403) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index 1e3e9470..c38b953e 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,268 +1,271 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from unittest.mock import Mock import pytest from django.conf import settings from django.contrib.auth import authenticate, get_backends from rest_framework.exceptions import AuthenticationFailed from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse -from . import sample_data -from .keycloak_mock import mock_keycloak - def _authenticate_user(request_factory): request = request_factory.get(reverse("oidc-login-complete")) return authenticate( request=request, code="some-code", code_verifier="some-code-verifier", redirect_uri="https://localhost:5004", ) def _check_authenticated_user(user, decoded_token, kc_oidc_mock): assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == decoded_token["preferred_username"] assert user.password == "" assert user.first_name == decoded_token["given_name"] assert user.last_name == decoded_token["family_name"] assert user.email == decoded_token["email"] assert user.is_staff == ("/staff" in decoded_token["groups"]) assert user.sub == decoded_token["sub"] resource_access = decoded_token.get("resource_access", {}) resource_access_client = resource_access.get(kc_oidc_mock, {}) assert user.permissions == set(resource_access_client.get("roles", [])) @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_success(keycloak_mock, request_factory): """ Checks successful login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ - kc_oidc_mock = mock_keycloak(mocker, user_groups=["/staff"]) - oidc_profile = sample_data.oidc_profile + keycloak_mock.user_groups = ["/staff"] + + oidc_profile = keycloak_mock.login() user = _authenticate_user(request_factory) - decoded_token = kc_oidc_mock.decode_token(user.access_token) - _check_authenticated_user(user, decoded_token, kc_oidc_mock) + decoded_token = keycloak_mock.decode_token(user.access_token) + _check_authenticated_user(user, decoded_token, keycloak_mock) auth_datetime = datetime.fromtimestamp(decoded_token["iat"]) exp_datetime = datetime.fromtimestamp(decoded_token["exp"]) refresh_exp_datetime = auth_datetime + timedelta( seconds=oidc_profile["refresh_expires_in"] ) assert user.access_token == oidc_profile["access_token"] assert user.expires_at == exp_datetime assert user.id_token == oidc_profile["id_token"] assert user.refresh_token == oidc_profile["refresh_token"] assert user.refresh_expires_at == refresh_exp_datetime assert user.scope == oidc_profile["scope"] assert user.session_state == oidc_profile["session_state"] backend_path = "swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend" assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_failure(keycloak_mock, request_factory): """ Checks failed login based on OpenID Connect with PKCE extension Django authentication backend (login from Web UI). """ - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) user = _authenticate_user(request_factory) assert user is None @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_success(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_refresh_token_success( + keycloak_mock, request_factory +): """ Checks access token renewal success using refresh token. """ - kc_oidc_mock = mock_keycloak(mocker) - oidc_profile = sample_data.oidc_profile - decoded_token = kc_oidc_mock.decode_token(oidc_profile["access_token"]) + oidc_profile = keycloak_mock.login() + decoded_token = keycloak_mock.decode_token(oidc_profile["access_token"]) new_access_token = "new_access_token" def _refresh_token(refresh_token): - oidc_profile = dict(sample_data.oidc_profile) + oidc_profile = dict(keycloak_mock.login()) oidc_profile["access_token"] = new_access_token return oidc_profile def _decode_token(access_token): if access_token != new_access_token: raise Exception("access token token has expired") else: return decoded_token - kc_oidc_mock.decode_token = Mock() - kc_oidc_mock.decode_token.side_effect = _decode_token - kc_oidc_mock.refresh_token.side_effect = _refresh_token + keycloak_mock.decode_token = Mock() + keycloak_mock.decode_token.side_effect = _decode_token + keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) - kc_oidc_mock.refresh_token.assert_called_with( - sample_data.oidc_profile["refresh_token"] - ) + oidc_profile = keycloak_mock.login() + keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is not None @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_refresh_token_failure(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_refresh_token_failure( + keycloak_mock, request_factory +): """ Checks access token renewal failure using refresh token. """ - kc_oidc_mock = mock_keycloak(mocker) def _refresh_token(refresh_token): raise Exception("OIDC session has expired") def _decode_token(access_token): raise Exception("access token token has expired") - kc_oidc_mock.decode_token = Mock() - kc_oidc_mock.decode_token.side_effect = _decode_token - kc_oidc_mock.refresh_token.side_effect = _refresh_token + keycloak_mock.decode_token = Mock() + keycloak_mock.decode_token.side_effect = _decode_token + keycloak_mock.refresh_token.side_effect = _refresh_token user = _authenticate_user(request_factory) - kc_oidc_mock.refresh_token.assert_called_with( - sample_data.oidc_profile["refresh_token"] - ) + oidc_profile = keycloak_mock.login() + keycloak_mock.refresh_token.assert_called_with(oidc_profile["refresh_token"]) assert user is None @pytest.mark.django_db -def test_oidc_code_pkce_auth_backend_permissions(mocker, request_factory): +def test_oidc_code_pkce_auth_backend_permissions(keycloak_mock, request_factory): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when logging from Web UI. """ permission = "webapp.some-permission" - mock_keycloak(mocker, user_permissions=[permission]) + keycloak_mock.user_permissions = [permission] user = _authenticate_user(request_factory) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_success(keycloak_mock, api_request_factory): """ Checks successful login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() - kc_oidc_mock = mock_keycloak(mocker) - - refresh_token = sample_data.oidc_profile["refresh_token"] - access_token = sample_data.oidc_profile["access_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] + access_token = oidc_profile["access_token"] - decoded_token = kc_oidc_mock.decode_token(access_token) + decoded_token = keycloak_mock.decode_token(access_token) request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) - _check_authenticated_user(user, decoded_token, kc_oidc_mock) + _check_authenticated_user(user, decoded_token, keycloak_mock) # oidc_profile is not filled when authenticating through bearer token assert hasattr(user, "access_token") and user.access_token is None @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login). """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() + oidc_profile = keycloak_mock.login() + # simulate a failed authentication with a bearer token in expected format - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) - refresh_token = sample_data.oidc_profile["refresh_token"] + refresh_token = oidc_profile["refresh_token"] request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # simulate a failed authentication with an invalid bearer token format request = api_request_factory.get( url, HTTP_AUTHORIZATION="Bearer invalid-token-format" ) with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) -def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): +def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_mock, api_request_factory): """ Checks failed login based on OpenID Connect bearer token Django REST Framework authentication backend (Web API login) due to invalid authorization header value. """ url = reverse("api-1-stat-counters") drf_auth_backend = OIDCBearerTokenAuthentication() - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] # Invalid authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) # Missing authorization type request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}") with pytest.raises(AuthenticationFailed): drf_auth_backend.authenticate(request) @pytest.mark.django_db -def test_drf_oidc_bearer_token_auth_backend_permissions(mocker, api_request_factory): +def test_drf_oidc_bearer_token_auth_backend_permissions( + keycloak_mock, api_request_factory +): """ Checks that a permission defined with OpenID Connect is correctly mapped to a Django one when using bearer token authentication. """ permission = "webapp.some-permission" - mock_keycloak(mocker, user_permissions=[permission]) + keycloak_mock.user_permissions = [permission] drf_auth_backend = OIDCBearerTokenAuthentication() - refresh_token = sample_data.oidc_profile["refresh_token"] + oidc_profile = keycloak_mock.login() + refresh_token = oidc_profile["refresh_token"] url = reverse("api-1-stat-counters") request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}") user, _ = drf_auth_backend.authenticate(request) assert user.has_perm(permission) assert user.get_all_permissions() == {permission} assert user.get_group_permissions() == {permission} assert user.has_module_perms("webapp") assert not user.has_module_perms("foo") diff --git a/swh/web/tests/auth/test_middlewares.py b/swh/web/tests/auth/test_middlewares.py index e72f40b5..f6698143 100644 --- a/swh/web/tests/auth/test_middlewares.py +++ b/swh/web/tests/auth/test_middlewares.py @@ -1,60 +1,57 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.core.cache import cache from django.test import modify_settings from swh.web.common.utils import reverse from swh.web.tests.utils import check_html_get_response -from .keycloak_mock import mock_keycloak - @pytest.mark.django_db @modify_settings( MIDDLEWARE={"remove": ["swh.web.auth.middlewares.OIDCSessionExpiredMiddleware"]} ) -def test_oidc_session_expired_middleware_disabled(client, mocker): +def test_oidc_session_expired_middleware_disabled(client, keycloak_mock): # authenticate user - kc_oidc_mock = mock_keycloak(mocker) + client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") # visit url first to get user from response response = check_html_get_response(client, url, status_code=200) # simulate OIDC session expiration cache.delete(f"oidc_user_{response.wsgi_request.user.id}") # no redirection when session has expired check_html_get_response(client, url, status_code=200) @pytest.mark.django_db -def test_oidc_session_expired_middleware_enabled(client, mocker): +def test_oidc_session_expired_middleware_enabled(client, keycloak_mock): # authenticate user - kc_oidc_mock = mock_keycloak(mocker) client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() url = reverse("swh-web-homepage") # visit url first to get user from response response = check_html_get_response(client, url, status_code=200) # simulate OIDC session expiration cache.delete(f"oidc_user_{response.wsgi_request.user.id}") # should redirect to logout page resp = check_html_get_response(client, url, status_code=302) silent_refresh_url = reverse( "logout", query_params={"next_path": url, "remote_user": 1} ) assert resp["location"] == silent_refresh_url diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index 11c0be23..8a6abcc4 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,518 +1,500 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID, decrypt_data from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view -from . import sample_data -from .keycloak_mock import mock_keycloak - def _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri, scope="openid" ): parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == redirect_uri assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == scope assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] return login_data @pytest.mark.django_db -def test_oidc_login_views_success(client, mocker): +def test_oidc_login_views_success(client, keycloak_mock): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) - # user initiates login process login_url = reverse("oidc-login") # should redirect to Keycloak authentication page in order # for a user to login with its username / password response = check_html_get_response(client, login_url, status_code=302) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) login_data = _check_oidc_login_code_flow_data( request, response, - kc_oidc_mock, + keycloak_mock, redirect_uri=reverse("oidc-login-complete", request=request), ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to root url by default response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db -def test_oidc_logout_view_success(client, mocker): +def test_oidc_logout_view_success(client, keycloak_mock): """ Simulate a successful logout operation with OpenID Connect. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") - kc_oidc_mock.authorization_code.assert_called() + keycloak_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") # should redirect to logout page response = check_html_get_response(client, oidc_logout_url, status_code=302) request = response.wsgi_request logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak - kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) + oidc_profile = keycloak_mock.login() + keycloak_mock.logout.assert_called_with(oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db -def test_oidc_login_view_failure(client, mocker): +def test_oidc_login_view_failure(client, keycloak_mock): """ Simulate a failed authentication with OpenID Connect. """ - # mock Keycloak client - mock_keycloak(mocker, auth_success=False) + keycloak_mock.set_auth_success(False) # user initiates login process login_url = reverse("oidc-login") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) -def test_oidc_login_complete_wrong_csrf_token(client, mocker): - # mock Keycloak client - mock_keycloak(mocker) - +def test_oidc_login_complete_wrong_csrf_token(client, keycloak_mock): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db -def test_oidc_login_complete_wrong_code_verifier(client, mocker): - # mock Keycloak client - mock_keycloak(mocker, auth_success=False) +def test_oidc_login_complete_wrong_code_verifier(client, keycloak_mock): + keycloak_mock.set_auth_success(False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db -def test_oidc_logout_view_failure(client, mocker): +def test_oidc_logout_view_failure(client, keycloak_mock): """ Simulate a failed logout operation with OpenID Connect. """ - # mock Keycloak client - kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" - kc_oidc_mock.logout.side_effect = Exception(err_msg) + keycloak_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") # should render an error page response = check_html_get_response( client, logout_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_get_response(client, url, status_code=403) def _generate_and_test_bearer_token(client, kc_oidc_mock): # user authenticates client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) # user initiates bearer token generation flow url = reverse("oidc-generate-bearer-token") response = check_http_get_response(client, url, status_code=302) request = response.wsgi_request redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) # check login data and redirection to Keycloak is valid login_data = _check_oidc_login_code_flow_data( request, response, kc_oidc_mock, redirect_uri=redirect_uri, scope="openid offline_access", ) # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-generate-bearer-token-complete' view # to get and save bearer token # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) token_complete_url = reverse( "oidc-generate-bearer-token-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) nb_tokens = len(OIDCUserOfflineTokens.objects.all()) response = check_html_get_response(client, token_complete_url, status_code=302) request = response.wsgi_request # check token has been generated and saved encrypted to database assert len(OIDCUserOfflineTokens.objects.all()) == nb_tokens + 1 encrypted_token = OIDCUserOfflineTokens.objects.last().offline_token secret = get_config()["secret_key"].encode() salt = request.user.sub.encode() decrypted_token = decrypt_data(encrypted_token, secret, salt) oidc_profile = kc_oidc_mock.authorization_code(code=code, redirect_uri=redirect_uri) assert decrypted_token.decode("ascii") == oidc_profile["refresh_token"] # should redirect to tokens management Web UI assert response["location"] == reverse("oidc-profile") + "#tokens" return decrypted_token @pytest.mark.django_db -def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): +def test_oidc_generate_bearer_token_authenticated_user_success(client, keycloak_mock): """ Authenticated user should be able to generate a bearer token using OIDC Authorization Code Flow. """ - kc_oidc_mock = mock_keycloak(mocker) - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_list_bearer_tokens(client, mocker): +def test_oidc_list_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to list his tokens. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_get_bearer_token(client, mocker): +def test_oidc_get_bearer_token(client, keycloak_mock): """ User with correct credentials should be allowed to display a token. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for i in range(nb_tokens): - token = _generate_and_test_bearer_token(client, kc_oidc_mock) + token = _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"token_id": i + 1}, content_type="text/plain", ) assert response.content == token def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db -def test_oidc_revoke_bearer_tokens(client, mocker): +def test_oidc_revoke_bearer_tokens(client, keycloak_mock): """ User with correct credentials should be allowed to revoke tokens. """ - kc_oidc_mock = mock_keycloak(mocker) nb_tokens = 3 for _ in range(nb_tokens): - _generate_and_test_bearer_token(client, kc_oidc_mock) + _generate_and_test_bearer_token(client, keycloak_mock) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 def test_oidc_profile_view_anonymous_user(client): """ Non authenticated users should be redirected to login page when requesting profile view. """ url = reverse("oidc-profile") login_url = reverse("oidc-login", query_params={"next_path": url}) resp = check_html_get_response(client, url, status_code=302) assert resp["location"] == login_url @pytest.mark.django_db -def test_oidc_profile_view(client, mocker): +def test_oidc_profile_view(client, keycloak_mock): """ Authenticated users should be able to request the profile page and link to Keycloak account UI should be present. """ url = reverse("oidc-profile") kc_config = get_config()["keycloak"] user_permissions = ["perm1", "perm2"] - mock_keycloak(mocker, user_permissions=user_permissions) + keycloak_mock.user_permissions = user_permissions client.login(code="", code_verifier="", redirect_uri="") resp = check_html_get_response( client, url, status_code=200, template_used="auth/profile.html" ) user = resp.wsgi_request.user kc_account_url = ( f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" ) assert_contains(resp, kc_account_url) assert_contains(resp, user.username) assert_contains(resp, user.first_name) assert_contains(resp, user.last_name) assert_contains(resp, user.email) for perm in user_permissions: assert_contains(resp, perm) diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 30dc3647..b84632d9 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,358 +1,382 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil from subprocess import PIPE, run import sys from typing import Any, Dict, List, Optional from hypothesis import HealthCheck from hypothesis import __version_info__ as hypothesis_version from hypothesis import settings import pytest from django.core.cache import cache from rest_framework.test import APIClient, APIRequestFactory +from swh.auth.pytest_plugin import keycloak_mock_factory from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest +from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.typing import OriginVisitInfo +from swh.web.config import get_config from swh.web.tests.data import get_tests_data, override_storages # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) suppress_health_check = [HealthCheck.too_slow, HealthCheck.filter_too_much] if hypothesis_version >= (5, 49): suppress_health_check.append(HealthCheck.function_scoped_fixture) settings.register_profile( "swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=1, suppress_health_check=suppress_health_check, ), ) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox static_dir = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../assets/src/bundles") _, dirs, _ = next(os.walk(bundles_dir)) mock_webpack_stats = {"status": "done", "publicPath": "/static", "chunks": {}} for bundle in dirs: asset = "js/%s.js" % bundle mock_webpack_stats["chunks"][bundle] = [ { "name": asset, "publicPath": "/static/%s" % asset, "path": os.path.join(static_dir, asset), } ] with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture(scope="module") def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture(scope="module") def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="session", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages(data["storage"], data["idx_storage"], data["search"]) return data # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="session") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="session") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)]) def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) + + +_keycloak_config = get_config()["keycloak"] + +_keycloak_mock = keycloak_mock_factory( + server_url=_keycloak_config["server_url"], + realm_name=_keycloak_config["realm_name"], + client_id=OIDC_SWH_WEB_CLIENT_ID, +) + + +@pytest.fixture +def keycloak_mock(_keycloak_mock, mocker): + for oidc_client_factory in ( + "swh.web.auth.views.get_oidc_client", + "swh.web.auth.backends.get_oidc_client", + ): + mock_get_oidc_client = mocker.patch(oidc_client_factory) + mock_get_oidc_client.return_value = _keycloak_mock + + return _keycloak_mock