diff --git a/MANIFEST.in b/MANIFEST.in index 807e2e9..e1818b6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,6 @@ include Makefile include requirements*.txt include version.txt include README.md recursive-include swh py.typed +include conftest.py diff --git a/PKG-INFO b/PKG-INFO index 7b777fe..a5409e0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,27 +1,27 @@ Metadata-Version: 2.1 Name: swh.auth -Version: 0.0.1 +Version: 0.1.0 Summary: Software Heritage Authentication Home-page: https://forge.softwareheritage.org/source/swh-auth/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh- Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-/ Description: swh-auth ========== Authentication library for SWH (keycloak common utilities) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..99ecebb --- /dev/null +++ b/conftest.py @@ -0,0 +1,6 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +pytest_plugins = ["swh.auth.pytest_plugin"] diff --git a/swh.auth.egg-info/PKG-INFO b/swh.auth.egg-info/PKG-INFO index 7b777fe..a5409e0 100644 --- a/swh.auth.egg-info/PKG-INFO +++ b/swh.auth.egg-info/PKG-INFO @@ -1,27 +1,27 @@ Metadata-Version: 2.1 Name: swh.auth -Version: 0.0.1 +Version: 0.1.0 Summary: Software Heritage Authentication Home-page: https://forge.softwareheritage.org/source/swh-auth/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh- Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-/ Description: swh-auth ========== Authentication library for SWH (keycloak common utilities) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.auth.egg-info/SOURCES.txt b/swh.auth.egg-info/SOURCES.txt index d6d0b7e..4b354fb 100644 --- a/swh.auth.egg-info/SOURCES.txt +++ b/swh.auth.egg-info/SOURCES.txt @@ -1,38 +1,39 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md +conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.auth.egg-info/PKG-INFO swh.auth.egg-info/SOURCES.txt swh.auth.egg-info/dependency_links.txt swh.auth.egg-info/requires.txt swh.auth.egg-info/top_level.txt swh/auth/__init__.py swh/auth/cli.py swh/auth/keycloak.py swh/auth/py.typed +swh/auth/pytest_plugin.py swh/auth/tests/__init__.py -swh/auth/tests/conftest.py swh/auth/tests/sample_data.py swh/auth/tests/test_keycloak.py \ No newline at end of file diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py index 53e8601..72d243c 100644 --- a/swh/auth/keycloak.py +++ b/swh/auth/keycloak.py @@ -1,140 +1,160 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional from urllib.parse import urlencode from keycloak import KeycloakOpenID class KeycloakOpenIDConnect: """ Wrapper class around python-keycloak to ease the interaction with Keycloak for managing authentication and user permissions with OpenID Connect. """ def __init__( self, server_url: str, realm_name: str, client_id: str, realm_public_key: str = "", ): """ Args: server_url: URL of the Keycloak server realm_name: The realm name client_id: The OpenID Connect client identifier realm_public_key: The realm public key (will be dynamically retrieved if not provided) """ self._keycloak = KeycloakOpenID( server_url=server_url, client_id=client_id, realm_name=realm_name, ) self.server_url = server_url self.realm_name = realm_name self.client_id = client_id self.realm_public_key = realm_public_key def well_known(self) -> Dict[str, Any]: """ Retrieve the OpenID Connect Well-Known URI registry from Keycloak. Returns: A dictionary filled with OpenID Connect URIS. """ return self._keycloak.well_know() def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: """ Get OpenID Connect authorization URL to authenticate users. Args: redirect_uri: URI to redirect to once a user is authenticated extra_params: Extra query parameters to add to the authorization URL """ auth_url = self._keycloak.auth_url(redirect_uri) if extra_params: auth_url += "&%s" % urlencode(extra_params) return auth_url def authorization_code( self, code: str, redirect_uri: str, **extra_params: str ) -> Dict[str, Any]: """ Get OpenID Connect authentication tokens using Authorization Code flow. Args: code: Authorization code provided by Keycloak redirect_uri: URI to redirect to once a user is authenticated (must be the same as the one provided to authorization_url): extra_params: Extra parameters to add in the authorization request payload. """ return self._keycloak.token( grant_type="authorization_code", code=code, redirect_uri=redirect_uri, **extra_params, ) + def login( + self, username: str, password: str, **extra_params: str + ) -> Dict[str, Any]: + """ + Get OpenID Connect authentication tokens using Direct Access Grant flow. + + Args: + username: an existing username in the realm + password: password associated to username + extra_params: Extra parameters to add in the authorization request + payload. + """ + return self._keycloak.token( + grant_type="password", + scope="openid", + username=username, + password=password, + **extra_params, + ) + def refresh_token(self, refresh_token: str) -> Dict[str, Any]: """ Request a new access token from Keycloak using a refresh token. Args: refresh_token: A refresh token provided by Keycloak Returns: A dictionary filled with tokens info """ return self._keycloak.refresh_token(refresh_token) def decode_token( self, token: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Try to decode a JWT token. Args: token: A JWT token to decode options: Options for jose.jwt.decode Returns: A dictionary filled with decoded token content """ if not self.realm_public_key: realm_public_key = self._keycloak.public_key() self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" self.realm_public_key += realm_public_key self.realm_public_key += "\n-----END PUBLIC KEY-----" return self._keycloak.decode_token( token, key=self.realm_public_key, options=options ) def logout(self, refresh_token: str) -> None: """ Logout a user by closing its authenticated session. Args: refresh_token: A refresh token provided by Keycloak """ self._keycloak.logout(refresh_token) def userinfo(self, access_token: str) -> Dict[str, Any]: """ Return user information from its access token. Args: access_token: An access token provided by Keycloak Returns: A dictionary fillled with user information """ return self._keycloak.userinfo(access_token) diff --git a/swh/auth/tests/conftest.py b/swh/auth/pytest_plugin.py similarity index 64% rename from swh/auth/tests/conftest.py rename to swh/auth/pytest_plugin.py index f5cd5b9..f4dc495 100644 --- a/swh/auth/tests/conftest.py +++ b/swh/auth/pytest_plugin.py @@ -1,157 +1,180 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from datetime import datetime, timezone -from typing import Optional +from typing import Dict, List, Optional from unittest.mock import Mock from keycloak.exceptions import KeycloakError import pytest from swh.auth.keycloak import KeycloakOpenIDConnect - -from .sample_data import ( - OIDC_PROFILE, - RAW_REALM_PUBLIC_KEY, - REALM, - SERVER_URL, - USER_INFO, -) +from swh.auth.tests.sample_data import OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, USER_INFO class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect): """Mock KeycloakOpenIDConnect class to allow testing Args: + server_url: Server main auth url (cf. + :py:data:`swh.auth.tests.sample_data.SERVER_URL`) + realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`) + client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`) auth_success: boolean flag to simulate authentication success or failure - exp: expiration + exp: expiration delay user_groups: user groups configuration (if any) user_permissions: user permissions configuration (if any) + oidc_profile: Dict response from a call to a token authentication query (cf. + :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`) + user_info: Dict response from a call to userinfo query (cf. + :py:data:`swh.auth.tests.sample_data.USER_INFO`) + raw_realm_public_key: A raw ascii text representing the realm public key (cf. + :py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`) """ def __init__( self, server_url: str, realm_name: str, client_id: str, auth_success: bool = True, exp: Optional[int] = None, - user_groups=[], - user_permissions=[], + user_groups: List[str] = [], + user_permissions: List[str] = [], + oidc_profile: Dict = OIDC_PROFILE, + user_info: Dict = USER_INFO, + raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): super().__init__( server_url=server_url, realm_name=realm_name, client_id=client_id ) self.exp = exp self.user_groups = user_groups self.user_permissions = user_permissions - self._keycloak.public_key = lambda: RAW_REALM_PUBLIC_KEY + self._keycloak.public_key = lambda: raw_realm_public_key self._keycloak.well_know = lambda: { "issuer": f"{self.server_url}realms/{self.realm_name}", "authorization_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/auth" ), "token_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/token" ), "token_introspection_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/token/" "introspect" ), "userinfo_endpoint": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/userinfo" ), "end_session_endpoint": ( f"{self.server_url}realms/" f"{self.realm_name}/protocol/" "openid-connect/logout" ), "jwks_uri": ( f"{self.server_url}realms/{self.realm_name}/" "protocol/openid-connect/certs" ), } - self.set_auth_success(auth_success) + self.set_auth_success(auth_success, oidc_profile, user_info) def decode_token(self, token): options = {} if self.auth_success: - # skip signature expiration check as we use a static oidc_profile - # for the tests with expired tokens in it + # skip signature expiration and audience checks as we use a static + # oidc_profile for the tests with expired tokens in it options["verify_exp"] = False + options["verify_aud"] = False decoded = super().decode_token(token, options) # tweak auth and exp time for tests expire_in = decoded["exp"] - decoded["auth_time"] if self.exp is not None: decoded["exp"] = self.exp decoded["auth_time"] = self.exp - expire_in else: decoded["auth_time"] = int(datetime.now(tz=timezone.utc).timestamp()) decoded["exp"] = decoded["auth_time"] + expire_in decoded["groups"] = self.user_groups + decoded["aud"] = [self.client_id, "account"] + decoded["azp"] = self.client_id if self.user_permissions: decoded["resource_access"][self.client_id] = { "roles": self.user_permissions } return decoded - def set_auth_success(self, auth_success: bool) -> None: + def set_auth_success( + self, + auth_success: bool, + oidc_profile: Optional[Dict] = None, + user_info: Optional[Dict] = None, + ) -> None: # following type ignore because mypy is not too happy about affecting mock to # method "Cannot assign to a method affecting mock". Ignore for now. self.authorization_code = Mock() # type: ignore self.refresh_token = Mock() # type: ignore + self.login = Mock() # type: ignore self.userinfo = Mock() # type: ignore self.logout = Mock() # type: ignore self.auth_success = auth_success if auth_success: - self.authorization_code.return_value = copy(OIDC_PROFILE) - self.refresh_token.return_value = copy(OIDC_PROFILE) - self.userinfo.return_value = copy(USER_INFO) + self.authorization_code.return_value = copy(oidc_profile) + self.refresh_token.return_value = copy(oidc_profile) + self.login.return_value = copy(oidc_profile) + self.userinfo.return_value = copy(user_info) else: self.authorization_url = Mock() # type: ignore exception = KeycloakError( error_message="Authentication failed", response_code=401 ) self.authorization_code.side_effect = exception self.authorization_url.side_effect = exception self.refresh_token.side_effect = exception self.userinfo.side_effect = exception self.logout.side_effect = exception def keycloak_mock_factory( - server_url=SERVER_URL, - realm_name=REALM, - client_id="swh-client-id", - auth_success=True, - exp=None, - user_groups=[], - user_permissions=[], + server_url: str, + realm_name: str, + client_id: str, + auth_success: bool = True, + exp: Optional[int] = None, + user_groups: List[str] = [], + user_permissions: List[str] = [], + oidc_profile: Dict = OIDC_PROFILE, + user_info: Dict = USER_INFO, + raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY, ): - """Keycloak mock fixture factory + """Keycloak mock fixture factory. Report to + :py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring. """ @pytest.fixture def keycloak_open_id_connect(): return KeycloackOpenIDConnectMock( server_url=server_url, realm_name=realm_name, client_id=client_id, auth_success=auth_success, exp=exp, user_groups=user_groups, user_permissions=user_permissions, + oidc_profile=oidc_profile, + user_info=user_info, + raw_realm_public_key=raw_realm_public_key, ) return keycloak_open_id_connect diff --git a/swh/auth/tests/sample_data.py b/swh/auth/tests/sample_data.py index 3e20e14..060985f 100644 --- a/swh/auth/tests/sample_data.py +++ b/swh/auth/tests/sample_data.py @@ -1,142 +1,134 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information SERVER_URL = "http://keycloak:8080/keycloak/auth/" -REALM = "SoftwareHeritage" -CLIENT_ID = "swh-web" +REALM_NAME = "SoftwareHeritage" +CLIENT_ID = "client-id" # Decoded token (out of the access token) DECODED_TOKEN = { "jti": "31fc50b7-bbe5-4f51-91ef-8e3eec51331e", "exp": 1614787019, "nbf": 0, "iat": 1582723101, "iss": "http://localhost:8080/auth/realms/SoftwareHeritage", "aud": [CLIENT_ID, "account"], "sub": "feacd344-b468-4a65-a236-14f61e6b7200", "typ": "Bearer", "azp": CLIENT_ID, "auth_time": 1614786418, "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", "acr": "1", "allowed-origins": ["*"], "realm_access": {"roles": ["offline_access", "uma_authorization"]}, "resource_access": { "account": {"roles": ["manage-account", "manage-account-links", "view-profile"]} }, "scope": "openid email profile", "email_verified": False, "name": "John Doe", "groups": [], "preferred_username": "johndoe", "given_name": "John", "family_name": "Doe", "email": "john.doe@example.com", } # Authentication response is an oidc profile dict OIDC_PROFILE = { "access_token": ( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV" "Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0." "eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz" "MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz" "MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs" "bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj" "b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2" "MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi" "YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy" "YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi" "MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6" "eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0" "aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl" "cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz" "Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg" "cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv" "aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi" "am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi" "OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-" "Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB" "AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO" "kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc" "HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl" "rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE" "oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ" "6A" ), "expires_in": 600, "id_token": ( "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0" "TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki" "OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi" "OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo" "dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp" "dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh" "NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13" "ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk" "ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx" "IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn" "cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2" "ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi" "am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee" "JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL" "aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_" "PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE" "0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN" "ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX" "ZbYnitD1Typ6Q" ), "not-before-policy": 0, "refresh_expires_in": 1800, "refresh_token": ( "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM" "zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk" "iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC" "JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL" "CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv" "U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q" "6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj" "oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid" "HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi" "OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ" "2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl" "sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic" "mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu" "YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc" "tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG" "UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI" ), "scope": "openid email profile", "session_state": "d82b90d1-0a94-4e74-ad66-dd95341c7b6d", "token_type": "bearer", } USER_INFO = { "email": "john.doe@example.com", "email_verified": False, "family_name": "Doe", "given_name": "John", "groups": ["/staff"], "name": "John Doe", "preferred_username": "johndoe", "sub": "feacd344-b468-4a65-a236-14f61e6b7200", } RAW_REALM_PUBLIC_KEY = ( "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u" "NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY" "y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy" "5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr" "drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie" "Hl5Lv7Iig4FOIXIVCaDGQIDAQAB" ) - -REALM_PUBLIC_KEY = { - "realm": REALM, - "public_key": RAW_REALM_PUBLIC_KEY, - "token-service": f"{SERVER_URL}realms/{REALM}/protocol/openid-connect", - "account-service": f"{SERVER_URL}realms/{REALM}/account", - "tokens-not-before": 0, -} diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py index 19226f3..5cf774e 100644 --- a/swh/auth/tests/test_keycloak.py +++ b/swh/auth/tests/test_keycloak.py @@ -1,86 +1,100 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import copy from urllib.parse import parse_qs, urlparse from keycloak.exceptions import KeycloakError import pytest -from swh.auth.tests.conftest import keycloak_mock_factory -from swh.auth.tests.sample_data import CLIENT_ID, DECODED_TOKEN, OIDC_PROFILE, USER_INFO - -# dataset we have here is bound to swh-web -keycloak_mock = keycloak_mock_factory(client_id=CLIENT_ID) +from swh.auth.pytest_plugin import keycloak_mock_factory +from swh.auth.tests.sample_data import ( + CLIENT_ID, + DECODED_TOKEN, + OIDC_PROFILE, + REALM_NAME, + SERVER_URL, + USER_INFO, +) + +# Make keycloak fixture to use for tests below. +keycloak_mock = keycloak_mock_factory( + server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID, +) def test_keycloak_well_known(keycloak_mock): well_known_result = keycloak_mock.well_known() assert set(well_known_result.keys()) == { "issuer", "authorization_endpoint", "token_endpoint", "userinfo_endpoint", "end_session_endpoint", "jwks_uri", "token_introspection_endpoint", } def test_keycloak_authorization_url(keycloak_mock): actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar") expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"] parsed_result = urlparse(actual_auth_uri) assert expected_auth_url.endswith(parsed_result.path) parsed_query = parse_qs(parsed_result.query) assert parsed_query == { "client_id": [CLIENT_ID], "response_type": ["code"], "redirect_uri": ["http://redirect-uri"], "foo": ["bar"], } def test_keycloak_authorization_code_fail(keycloak_mock): "Authorization failure raise error" # Simulate failed authentication with Keycloak keycloak_mock.set_auth_success(False) with pytest.raises(KeycloakError): keycloak_mock.authorization_code("auth-code", "redirect-uri") def test_keycloak_authorization_code(keycloak_mock): actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri") assert actual_response == OIDC_PROFILE def test_keycloak_refresh_token(keycloak_mock): actual_result = keycloak_mock.refresh_token("refresh-token") assert actual_result == OIDC_PROFILE def test_keycloak_userinfo(keycloak_mock): actual_user_info = keycloak_mock.userinfo("refresh-token") assert actual_user_info == USER_INFO def test_keycloak_logout(keycloak_mock): """Login out does not raise""" keycloak_mock.logout("refresh-token") def test_keycloak_decode_token(keycloak_mock): actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"]) actual_decoded_data2 = copy(actual_decoded_data) expected_decoded_token = copy(DECODED_TOKEN) for dynamic_valued_key in ["exp", "auth_time"]: actual_decoded_data2.pop(dynamic_valued_key) expected_decoded_token.pop(dynamic_valued_key) assert actual_decoded_data2 == expected_decoded_token + + +def test_keycloak_login(keycloak_mock): + actual_response = keycloak_mock.login("username", "password") + assert actual_response == OIDC_PROFILE