Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/auth/keycloak.py b/swh/auth/keycloak.py
index e06ab7e..d17b11e 100644
--- a/swh/auth/keycloak.py
+++ b/swh/auth/keycloak.py
@@ -1,246 +1,250 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from typing import Any, Dict, Optional
from urllib.parse import urlencode
# add ExpiredSignatureError alias to avoid leaking jose import
# in swh-auth client code
from jose.jwt import ExpiredSignatureError # noqa
from keycloak import KeycloakOpenID
# add KeycloakError alias to avoid leaking keycloak import
# in swh-auth client code
from keycloak.exceptions import KeycloakError # noqa
from swh.core.config import load_from_envvar
class KeycloakOpenIDConnect:
"""
Wrapper class around python-keycloak to ease the interaction with Keycloak
for managing authentication and user permissions with OpenID Connect.
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
realm_public_key: str = "",
):
"""
Args:
server_url: URL of the Keycloak server
realm_name: The realm name
client_id: The OpenID Connect client identifier
realm_public_key: The realm public key (will be dynamically
retrieved if not provided)
"""
self._keycloak = KeycloakOpenID(
server_url=server_url,
client_id=client_id,
realm_name=realm_name,
)
self.server_url = server_url
self.realm_public_key = realm_public_key
@property
def realm_name(self):
return self._keycloak.realm_name
@realm_name.setter
def realm_name(self, value):
self._keycloak.realm_name = value
@property
def client_id(self):
return self._keycloak.client_id
@client_id.setter
def client_id(self, value):
self._keycloak.client_id = value
def well_known(self) -> Dict[str, Any]:
"""
Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
Returns:
A dictionary filled with OpenID Connect URIS.
"""
- return self._keycloak.well_know()
+ try:
+ return self._keycloak.well_known()
+ except AttributeError:
+ # python-keycloak < 1.0.0
+ return self._keycloak.well_know()
def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
"""
Get OpenID Connect authorization URL to authenticate users.
Args:
redirect_uri: URI to redirect to once a user is authenticated
extra_params: Extra query parameters to add to the
authorization URL
"""
auth_url = self._keycloak.auth_url(redirect_uri)
if extra_params:
auth_url += "&%s" % urlencode(extra_params)
return auth_url
def authorization_code(
self, code: str, redirect_uri: str, **extra_params: str
) -> Dict[str, Any]:
"""
Get OpenID Connect authentication tokens using Authorization
Code flow.
Raises:
KeycloakError in case of authentication failures
Args:
code: Authorization code provided by Keycloak
redirect_uri: URI to redirect to once a user is authenticated
(must be the same as the one provided to authorization_url):
extra_params: Extra parameters to add in the authorization request
payload.
"""
return self._keycloak.token(
grant_type="authorization_code",
code=code,
redirect_uri=redirect_uri,
**extra_params,
)
def login(
self, username: str, password: str, scope: str = "openid", **extra_params: str
) -> Dict[str, Any]:
"""
Get OpenID Connect authentication tokens using Direct Access Grant flow.
Raises:
KeycloakError in case of authentication failures
Args:
username: an existing username in the realm
password: password associated to username
extra_params: Extra parameters to add in the authorization request
payload.
"""
return self._keycloak.token(
grant_type="password",
scope=scope,
username=username,
password=password,
**extra_params,
)
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Request a new access token from Keycloak using a refresh token.
Args:
refresh_token: A refresh token provided by Keycloak
Returns:
A dictionary filled with tokens info
"""
return self._keycloak.refresh_token(refresh_token)
def decode_token(
self, token: str, options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Try to decode a JWT token.
Args:
token: A JWT token to decode
options: Options for jose.jwt.decode
Returns:
A dictionary filled with decoded token content
"""
if not self.realm_public_key:
realm_public_key = self._keycloak.public_key()
self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
self.realm_public_key += realm_public_key
self.realm_public_key += "\n-----END PUBLIC KEY-----"
return self._keycloak.decode_token(
token, key=self.realm_public_key, options=options
)
def logout(self, refresh_token: str) -> None:
"""
Logout a user by closing its authenticated session.
Args:
refresh_token: A refresh token provided by Keycloak
"""
self._keycloak.logout(refresh_token)
def userinfo(self, access_token: str) -> Dict[str, Any]:
"""
Return user information from its access token.
Args:
access_token: An access token provided by Keycloak
Returns:
A dictionary fillled with user information
"""
return self._keycloak.userinfo(access_token)
@classmethod
def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect":
"""Instantiate a KeycloakOpenIDConnect class from a configuration dict.
Args:
kwargs: configuration dict for the instance, with one keycloak key, whose
value is a Dict with the following keys:
- server_url: URL of the Keycloak server
- realm_name: The realm name
- client_id: The OpenID Connect client identifier
Returns:
the KeycloakOpenIDConnect instance
"""
cfg = kwargs["keycloak"]
return cls(
server_url=cfg["server_url"],
realm_name=cfg["realm_name"],
client_id=cfg["client_id"],
)
@classmethod
def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect":
"""Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their
value is not None.
Args:
kwargs: kwargs passed to instantiation call
Returns:
the KeycloakOpenIDConnect instance
"""
config = dict(load_from_envvar()).get("keycloak", {})
config.update({k: v for k, v in kwargs.items() if v is not None})
return cls.from_config(keycloak=config)
def keycloak_error_message(keycloak_error: KeycloakError) -> str:
"""Transform a keycloak exception into an error message."""
try:
# keycloak error wrapped in a JSON document
msg_dict = json.loads(keycloak_error.error_message.decode())
error_msg = msg_dict["error"]
error_desc = msg_dict.get("error_description")
if error_desc:
error_msg = f"{error_msg}: {error_desc}"
return error_msg
except Exception:
# fallback: return error message string
return keycloak_error.error_message
diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py
index 9dd5e5b..7f0aa25 100644
--- a/swh/auth/pytest_plugin.py
+++ b/swh/auth/pytest_plugin.py
@@ -1,229 +1,232 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import copy
from datetime import datetime, timezone
import json
from typing import Dict, List, Optional
from unittest.mock import Mock
from keycloak.exceptions import KeycloakError
import pytest
from swh.auth.keycloak import KeycloakOpenIDConnect
from swh.auth.tests.sample_data import (
CLIENT_ID,
OIDC_PROFILE,
RAW_REALM_PUBLIC_KEY,
REALM_NAME,
SERVER_URL,
USER_INFO,
)
class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
"""Mock KeycloakOpenIDConnect class to allow testing
Args:
server_url: Server main auth url (cf.
:py:data:`swh.auth.tests.sample_data.SERVER_URL`)
realm_name: Realm (cf. :py:data:`swh.auth.tests.sample_data.REALM_NAME`)
client_id: Client id (cf. :py:data:`swh.auth.tests.sample_data.CLIENT_ID`)
auth_success: boolean flag to simulate authentication success or failure
exp: expiration delay
user_groups: user groups configuration (if any)
realm_permissions: user permissions configuration at realm level (if any)
client_permissions: user permissions configuration at client level (if any)
oidc_profile: Dict response from a call to a token authentication query (cf.
:py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`)
user_info: Dict response from a call to userinfo query (cf.
:py:data:`swh.auth.tests.sample_data.USER_INFO`)
raw_realm_public_key: A raw ascii text representing the realm public key (cf.
:py:data:`swh.auth.tests.sample_data.RAW_REALM_PUBLIC_KEY`)
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
realm_permissions: List[str] = [],
client_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
"""Constructor"""
super().__init__(
server_url=server_url, realm_name=realm_name, client_id=client_id
)
self.exp = exp
self.user_groups = user_groups
self.realm_permissions = realm_permissions
self.client_permissions = client_permissions
self._keycloak.public_key = lambda: raw_realm_public_key
- self._keycloak.well_know = lambda: {
+ self._keycloak.well_known = lambda: {
"issuer": f"{self.server_url}realms/{self.realm_name}",
"authorization_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/auth"
),
"token_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/token"
),
"token_introspection_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/token/"
"introspect"
),
"userinfo_endpoint": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/userinfo"
),
"end_session_endpoint": (
f"{self.server_url}realms/"
f"{self.realm_name}/protocol/"
"openid-connect/logout"
),
"jwks_uri": (
f"{self.server_url}realms/{self.realm_name}/"
"protocol/openid-connect/certs"
),
}
+ # for python-keycloak < 1.0.0:
+ self._keycloak.well_know = self._keycloak.well_known
+
self.set_auth_success(auth_success, oidc_profile, user_info)
def decode_token(self, token):
options = {}
if self.auth_success:
# skip signature expiration and audience checks as we use a static
# oidc_profile for the tests with expired tokens in it
options["verify_exp"] = False
options["verify_aud"] = False
decoded = super().decode_token(token, options)
# Merge the user info configured to be part of the decode token
userinfo = self.userinfo()
if userinfo is not None:
decoded = {**decoded, **userinfo}
# tweak auth and exp time for tests
expire_in = decoded["exp"] - decoded["iat"]
if self.exp is not None:
decoded["exp"] = self.exp
decoded["iat"] = self.exp - expire_in
else:
now = int(datetime.now(tz=timezone.utc).timestamp())
decoded["iat"] = now
decoded["exp"] = now + expire_in
decoded["groups"] = self.user_groups
decoded["aud"] = [self.client_id, "account"]
decoded["azp"] = self.client_id
decoded["realm_access"]["roles"] += self.realm_permissions
if self.client_permissions:
decoded["resource_access"][self.client_id] = {
"roles": self.client_permissions
}
return decoded
def set_auth_success(
self,
auth_success: bool,
oidc_profile: Optional[Dict] = None,
user_info: Optional[Dict] = None,
) -> None:
# following type ignore because mypy is not too happy about affecting mock to
# method "Cannot assign to a method affecting mock". Ignore for now.
self.authorization_code = Mock() # type: ignore
self.refresh_token = Mock() # type: ignore
self.login = Mock() # type: ignore
self.userinfo = Mock() # type: ignore
self.logout = Mock() # type: ignore
self.auth_success = auth_success
if auth_success:
self.authorization_code.return_value = copy(oidc_profile)
self.refresh_token.return_value = copy(oidc_profile)
self.login.return_value = copy(oidc_profile)
self.userinfo.return_value = copy(user_info)
else:
self.authorization_url = Mock() # type: ignore
error = {
"error": "invalid_grant",
"error_description": "Invalid user credentials",
}
error_message = json.dumps(error).encode()
exception = KeycloakError(error_message=error_message, response_code=401)
self.authorization_code.side_effect = exception
self.authorization_url.side_effect = exception
self.refresh_token.side_effect = exception
self.userinfo.side_effect = exception
self.logout.side_effect = exception
self.login.side_effect = exception
def keycloak_oidc_factory(
server_url: str,
realm_name: str,
client_id: str,
auth_success: bool = True,
exp: Optional[int] = None,
user_groups: List[str] = [],
realm_permissions: List[str] = [],
client_permissions: List[str] = [],
oidc_profile: Dict = OIDC_PROFILE,
user_info: Dict = USER_INFO,
raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
):
"""Keycloak mock fixture factory. Report to
:py:class:`swh.auth.pytest_plugin.KeycloackOpenIDConnectMock` docstring.
"""
@pytest.fixture
def keycloak_oidc():
return KeycloackOpenIDConnectMock(
server_url=server_url,
realm_name=realm_name,
client_id=client_id,
auth_success=auth_success,
exp=exp,
user_groups=user_groups,
realm_permissions=realm_permissions,
client_permissions=client_permissions,
oidc_profile=oidc_profile,
user_info=user_info,
raw_realm_public_key=raw_realm_public_key,
)
return keycloak_oidc
# for backward compatibility
# TODO: remove that alias once swh-deposit and swh-web use new function name
keycloak_mock_factory = keycloak_oidc_factory
# generic keycloak fixture that can be used within tests
# (cf. test_keycloak.py, test_utils.py, django related tests)
# or external modules using that pytest plugin
_keycloak_oidc = keycloak_oidc_factory(
server_url=SERVER_URL,
realm_name=REALM_NAME,
client_id=CLIENT_ID,
)
@pytest.fixture
def keycloak_oidc(_keycloak_oidc, mocker):
for oidc_client_factory in (
"swh.auth.django.views.keycloak_oidc_client",
"swh.auth.django.backends.keycloak_oidc_client",
):
keycloak_oidc_client = mocker.patch(oidc_client_factory)
keycloak_oidc_client.return_value = _keycloak_oidc
return _keycloak_oidc

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 7:50 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293013

Event Timeline