Page MenuHomeSoftware Heritage

keycloak.py
No OneTemporary

keycloak.py

# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from typing import Any, Dict, Optional
from urllib.parse import urlencode
from keycloak import KeycloakOpenID
# The next import is required to allow callers to catch on their own term the following
# exception
from keycloak.exceptions import KeycloakError # noqa
from swh.core.config import load_from_envvar
class KeycloakOpenIDConnect:
"""
Wrapper class around python-keycloak to ease the interaction with Keycloak
for managing authentication and user permissions with OpenID Connect.
"""
def __init__(
self,
server_url: str,
realm_name: str,
client_id: str,
realm_public_key: str = "",
):
"""
Args:
server_url: URL of the Keycloak server
realm_name: The realm name
client_id: The OpenID Connect client identifier
realm_public_key: The realm public key (will be dynamically
retrieved if not provided)
"""
self._keycloak = KeycloakOpenID(
server_url=server_url, client_id=client_id, realm_name=realm_name,
)
self.server_url = server_url
self.realm_name = realm_name
self.client_id = client_id
self.realm_public_key = realm_public_key
def well_known(self) -> Dict[str, Any]:
"""
Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
Returns:
A dictionary filled with OpenID Connect URIS.
"""
return self._keycloak.well_know()
def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
"""
Get OpenID Connect authorization URL to authenticate users.
Args:
redirect_uri: URI to redirect to once a user is authenticated
extra_params: Extra query parameters to add to the
authorization URL
"""
auth_url = self._keycloak.auth_url(redirect_uri)
if extra_params:
auth_url += "&%s" % urlencode(extra_params)
return auth_url
def authorization_code(
self, code: str, redirect_uri: str, **extra_params: str
) -> Dict[str, Any]:
"""
Get OpenID Connect authentication tokens using Authorization
Code flow.
Raises:
KeycloakError in case of authentication failures
Args:
code: Authorization code provided by Keycloak
redirect_uri: URI to redirect to once a user is authenticated
(must be the same as the one provided to authorization_url):
extra_params: Extra parameters to add in the authorization request
payload.
"""
return self._keycloak.token(
grant_type="authorization_code",
code=code,
redirect_uri=redirect_uri,
**extra_params,
)
def login(
self, username: str, password: str, **extra_params: str
) -> Dict[str, Any]:
"""
Get OpenID Connect authentication tokens using Direct Access Grant flow.
Raises:
KeycloakError in case of authentication failures
Args:
username: an existing username in the realm
password: password associated to username
extra_params: Extra parameters to add in the authorization request
payload.
"""
return self._keycloak.token(
grant_type="password",
scope="openid",
username=username,
password=password,
**extra_params,
)
def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""
Request a new access token from Keycloak using a refresh token.
Args:
refresh_token: A refresh token provided by Keycloak
Returns:
A dictionary filled with tokens info
"""
return self._keycloak.refresh_token(refresh_token)
def decode_token(
self, token: str, options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Try to decode a JWT token.
Args:
token: A JWT token to decode
options: Options for jose.jwt.decode
Returns:
A dictionary filled with decoded token content
"""
if not self.realm_public_key:
realm_public_key = self._keycloak.public_key()
self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
self.realm_public_key += realm_public_key
self.realm_public_key += "\n-----END PUBLIC KEY-----"
return self._keycloak.decode_token(
token, key=self.realm_public_key, options=options
)
def logout(self, refresh_token: str) -> None:
"""
Logout a user by closing its authenticated session.
Args:
refresh_token: A refresh token provided by Keycloak
"""
self._keycloak.logout(refresh_token)
def userinfo(self, access_token: str) -> Dict[str, Any]:
"""
Return user information from its access token.
Args:
access_token: An access token provided by Keycloak
Returns:
A dictionary fillled with user information
"""
return self._keycloak.userinfo(access_token)
@classmethod
def from_config(cls, **kwargs: Any) -> "KeycloakOpenIDConnect":
"""Instantiate a KeycloakOpenIDConnect class from a configuration dict.
Args:
kwargs: configuration dict for the instance, with one keycloak key, whose
value is a Dict with the following keys:
- server_url: URL of the Keycloak server
- realm_name: The realm name
- client_id: The OpenID Connect client identifier
Returns:
the KeycloakOpenIDConnect instance
"""
cfg = kwargs["keycloak"]
return cls(
server_url=cfg["server_url"],
realm_name=cfg["realm_name"],
client_id=cfg["client_id"],
)
@classmethod
def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect":
"""Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their
value is not None.
Args:
kwargs: kwargs passed to instantiation call
Returns:
the KeycloakOpenIDConnect instance
"""
config = dict(load_from_envvar()).get("keycloak", {})
config.update({k: v for k, v in kwargs.items() if v is not None})
return cls.from_config(keycloak=config)
def keycloak_error_message(keycloak_error: KeycloakError) -> str:
"""Transform a keycloak exception into an error message.
"""
msg_dict = json.loads(keycloak_error.error_message.decode())
error_msg = msg_dict["error"]
error_desc = msg_dict.get("error_description")
if error_desc:
error_msg = f"{error_msg}: {error_desc}"
return error_msg

File Metadata

Mime Type
text/x-python
Expires
Jun 4 2025, 7:43 PM (10 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3376213

Event Timeline