Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py
index 53e8601..72d243c 100644
--- a/swh/auth/keycloak.py
+++ b/swh/auth/keycloak.py
@@ -1,140 +1,160 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Optional
from urllib.parse import urlencode
from keycloak import KeycloakOpenID
class KeycloakOpenIDConnect:
"""
Wrapper class around python-keycloak to ease the interaction with Keycloak
for managing authentication and user permissions with OpenID Connect.
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
realm_public_key: str = "",
):
"""
Args:
server_url: URL of the Keycloak server
realm_name: The realm name
client_id: The OpenID Connect client identifier
realm_public_key: The realm public key (will be dynamically
retrieved if not provided)
"""
self._keycloak = KeycloakOpenID(
server_url=server_url, client_id=client_id, realm_name=realm_name,
)
self.server_url = server_url
self.realm_name = realm_name
self.client_id = client_id
self.realm_public_key = realm_public_key
def well_known(self) -> Dict[str, Any]:
"""
Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
Returns:
A dictionary filled with OpenID Connect URIS.
"""
return self._keycloak.well_know()
def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
"""
Get OpenID Connect authorization URL to authenticate users.
Args:
redirect_uri: URI to redirect to once a user is authenticated
extra_params: Extra query parameters to add to the
authorization URL
"""
auth_url = self._keycloak.auth_url(redirect_uri)
if extra_params:
auth_url += "&%s" % urlencode(extra_params)
return auth_url
def authorization_code(
self, code: str, redirect_uri: str, **extra_params: str
) -> Dict[str, Any]:
"""
Get OpenID Connect authentication tokens using Authorization
Code flow.
Args:
code: Authorization code provided by Keycloak
redirect_uri: URI to redirect to once a user is authenticated
(must be the same as the one provided to authorization_url):
extra_params: Extra parameters to add in the authorization request
payload.
"""
return self._keycloak.token(
grant_type="authorization_code",
code=code,
redirect_uri=redirect_uri,
**extra_params,
)
+ def login(
+ self, username: str, password: str, **extra_params: str
+ ) -> Dict[str, Any]:
+ """
+ Get OpenID Connect authentication tokens using Direct Access Grant flow.
+
+ Args:
+ username: an existing username in the realm
+ password: password associated to username
+ extra_params: Extra parameters to add in the authorization request
+ payload.
+ """
+ return self._keycloak.token(
+ grant_type="password",
+ scope="openid",
+ username=username,
+ password=password,
+ **extra_params,
+ )
+
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Request a new access token from Keycloak using a refresh token.
Args:
refresh_token: A refresh token provided by Keycloak
Returns:
A dictionary filled with tokens info
"""
return self._keycloak.refresh_token(refresh_token)
def decode_token(
self, token: str, options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Try to decode a JWT token.
Args:
token: A JWT token to decode
options: Options for jose.jwt.decode
Returns:
A dictionary filled with decoded token content
"""
if not self.realm_public_key:
realm_public_key = self._keycloak.public_key()
self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
self.realm_public_key += realm_public_key
self.realm_public_key += "\n-----END PUBLIC KEY-----"
return self._keycloak.decode_token(
token, key=self.realm_public_key, options=options
)
def logout(self, refresh_token: str) -> None:
"""
Logout a user by closing its authenticated session.
Args:
refresh_token: A refresh token provided by Keycloak
"""
self._keycloak.logout(refresh_token)
def userinfo(self, access_token: str) -> Dict[str, Any]:
"""
Return user information from its access token.
Args:
access_token: An access token provided by Keycloak
Returns:
A dictionary fillled with user information
"""
return self._keycloak.userinfo(access_token)
diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py
index f585fdf..f4dc495 100644
--- a/swh/auth/pytest_plugin.py
+++ b/swh/auth/pytest_plugin.py
@@ -1,178 +1,180 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from datetime import datetime, timezone
from typing import Dict, List, Optional
from unittest.mock import Mock
from keycloak.exceptions import KeycloakError
import pytest
from swh.auth.keycloak import KeycloakOpenIDConnect
from swh.auth.tests.sample_data import OIDC_PROFILE, RAW_REALM_PUBLIC_KEY, USER_INFO
class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
"""Mock KeycloakOpenIDConnect class to allow testing
Args:
server_url: Server main auth url (cf.
:py:data:`swh.auth.tests.sample_data.SERVER_URL`)
realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`)
client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`)
auth_success: boolean flag to simulate authentication success or failure
exp: expiration delay
user_groups: user groups configuration (if any)
user_permissions: user permissions configuration (if any)
oidc_profile: Dict response from a call to a token authentication query (cf.
:py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`)
user_info: Dict response from a call to userinfo query (cf.
:py:data:`swh.auth.tests.sample_data.USER_INFO`)
raw_realm_public_key: A raw ascii text representing the realm public key (cf.
:py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`)
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
user_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
super().__init__(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
self.exp = exp
self.user_groups = user_groups
self.user_permissions = user_permissions
self._keycloak.public_key = lambda: raw_realm_public_key
self._keycloak.well_know = lambda: {
"issuer": f"{self.server_url}realms/{self.realm_name}",
"authorization_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/auth"
),
"token_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/token"
),
"token_introspection_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/token/"
"introspect"
),
"userinfo_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/userinfo"
),
"end_session_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/logout"
),
"jwks_uri": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/certs"
),
}
self.set_auth_success(auth_success, oidc_profile, user_info)
def decode_token(self, token):
options = {}
if self.auth_success:
# skip signature expiration and audience checks as we use a static
# oidc_profile for the tests with expired tokens in it
options["verify_exp"] = False
options["verify_aud"] = False
decoded = super().decode_token(token, options)
# tweak auth and exp time for tests
expire_in = decoded["exp"] - decoded["auth_time"]
if self.exp is not None:
decoded["exp"] = self.exp
decoded["auth_time"] = self.exp - expire_in
else:
decoded["auth_time"] = int(datetime.now(tz=timezone.utc).timestamp())
decoded["exp"] = decoded["auth_time"] + expire_in
decoded["groups"] = self.user_groups
decoded["aud"] = [self.client_id, "account"]
decoded["azp"] = self.client_id
if self.user_permissions:
decoded["resource_access"][self.client_id] = {
"roles": self.user_permissions
}
return decoded
def set_auth_success(
self,
auth_success: bool,
oidc_profile: Optional[Dict] = None,
user_info: Optional[Dict] = None,
) -> None:
# following type ignore because mypy is not too happy about affecting mock to
# method "Cannot assign to a method affecting mock". Ignore for now.
self.authorization_code = Mock() # type: ignore
self.refresh_token = Mock() # type: ignore
+ self.login = Mock() # type: ignore
self.userinfo = Mock() # type: ignore
self.logout = Mock() # type: ignore
self.auth_success = auth_success
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.refresh_token.return_value = copy(oidc_profile)
+ self.login.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(user_info)
else:
self.authorization_url = Mock() # type: ignore
exception = KeycloakError(
error_message="Authentication failed", response_code=401
)
self.authorization_code.side_effect = exception
self.authorization_url.side_effect = exception
self.refresh_token.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
def keycloak_mock_factory(
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
user_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
"""Keycloak mock fixture factory. Report to
:py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring.
"""
@pytest.fixture
def keycloak_open_id_connect():
return KeycloackOpenIDConnectMock(
server_url=server_url,
realm_name=realm_name,
client_id=client_id,
auth_success=auth_success,
exp=exp,
user_groups=user_groups,
user_permissions=user_permissions,
oidc_profile=oidc_profile,
user_info=user_info,
raw_realm_public_key=raw_realm_public_key,
)
return keycloak_open_id_connect
diff --git a/swh/auth/tests/test_keycloak.py b/swh/auth/tests/test_keycloak.py
index 15d5caf..5cf774e 100644
--- a/swh/auth/tests/test_keycloak.py
+++ b/swh/auth/tests/test_keycloak.py
@@ -1,95 +1,100 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from urllib.parse import parse_qs, urlparse
from keycloak.exceptions import KeycloakError
import pytest
from swh.auth.pytest_plugin import keycloak_mock_factory
from swh.auth.tests.sample_data import (
CLIENT_ID,
DECODED_TOKEN,
OIDC_PROFILE,
REALM_NAME,
SERVER_URL,
USER_INFO,
)
# Make keycloak fixture to use for tests below.
keycloak_mock = keycloak_mock_factory(
server_url=SERVER_URL, realm_name=REALM_NAME, client_id=CLIENT_ID,
)
def test_keycloak_well_known(keycloak_mock):
well_known_result = keycloak_mock.well_known()
assert set(well_known_result.keys()) == {
"issuer",
"authorization_endpoint",
"token_endpoint",
"userinfo_endpoint",
"end_session_endpoint",
"jwks_uri",
"token_introspection_endpoint",
}
def test_keycloak_authorization_url(keycloak_mock):
actual_auth_uri = keycloak_mock.authorization_url("http://redirect-uri", foo="bar")
expected_auth_url = keycloak_mock.well_known()["authorization_endpoint"]
parsed_result = urlparse(actual_auth_uri)
assert expected_auth_url.endswith(parsed_result.path)
parsed_query = parse_qs(parsed_result.query)
assert parsed_query == {
"client_id": [CLIENT_ID],
"response_type": ["code"],
"redirect_uri": ["http://redirect-uri"],
"foo": ["bar"],
}
def test_keycloak_authorization_code_fail(keycloak_mock):
"Authorization failure raise error"
# Simulate failed authentication with Keycloak
keycloak_mock.set_auth_success(False)
with pytest.raises(KeycloakError):
keycloak_mock.authorization_code("auth-code", "redirect-uri")
def test_keycloak_authorization_code(keycloak_mock):
actual_response = keycloak_mock.authorization_code("auth-code", "redirect-uri")
assert actual_response == OIDC_PROFILE
def test_keycloak_refresh_token(keycloak_mock):
actual_result = keycloak_mock.refresh_token("refresh-token")
assert actual_result == OIDC_PROFILE
def test_keycloak_userinfo(keycloak_mock):
actual_user_info = keycloak_mock.userinfo("refresh-token")
assert actual_user_info == USER_INFO
def test_keycloak_logout(keycloak_mock):
"""Login out does not raise"""
keycloak_mock.logout("refresh-token")
def test_keycloak_decode_token(keycloak_mock):
actual_decoded_data = keycloak_mock.decode_token(OIDC_PROFILE["access_token"])
actual_decoded_data2 = copy(actual_decoded_data)
expected_decoded_token = copy(DECODED_TOKEN)
for dynamic_valued_key in ["exp", "auth_time"]:
actual_decoded_data2.pop(dynamic_valued_key)
expected_decoded_token.pop(dynamic_valued_key)
assert actual_decoded_data2 == expected_decoded_token
+
+
+def test_keycloak_login(keycloak_mock):
+ actual_response = keycloak_mock.login("username", "password")
+ assert actual_response == OIDC_PROFILE

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 6:16 PM (5 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253607

Event Timeline