Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8395709
keycloak.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
keycloak.py
View Options
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
json
from
typing
import
Any
,
Dict
,
Optional
from
urllib.parse
import
urlencode
from
keycloak
import
KeycloakOpenID
# The next import is required to allow callers to catch on their own term the following
# exception
from
keycloak.exceptions
import
KeycloakError
# noqa
from
swh.core.config
import
load_from_envvar
class
KeycloakOpenIDConnect
:
"""
Wrapper class around python-keycloak to ease the interaction with Keycloak
for managing authentication and user permissions with OpenID Connect.
"""
def
__init__
(
self
,
server_url
:
str
,
realm_name
:
str
,
client_id
:
str
,
realm_public_key
:
str
=
""
,
):
"""
Args:
server_url: URL of the Keycloak server
realm_name: The realm name
client_id: The OpenID Connect client identifier
realm_public_key: The realm public key (will be dynamically
retrieved if not provided)
"""
self
.
_keycloak
=
KeycloakOpenID
(
server_url
=
server_url
,
client_id
=
client_id
,
realm_name
=
realm_name
,
)
self
.
server_url
=
server_url
self
.
realm_name
=
realm_name
self
.
client_id
=
client_id
self
.
realm_public_key
=
realm_public_key
def
well_known
(
self
)
->
Dict
[
str
,
Any
]:
"""
Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
Returns:
A dictionary filled with OpenID Connect URIS.
"""
return
self
.
_keycloak
.
well_know
()
def
authorization_url
(
self
,
redirect_uri
:
str
,
**
extra_params
:
str
)
->
str
:
"""
Get OpenID Connect authorization URL to authenticate users.
Args:
redirect_uri: URI to redirect to once a user is authenticated
extra_params: Extra query parameters to add to the
authorization URL
"""
auth_url
=
self
.
_keycloak
.
auth_url
(
redirect_uri
)
if
extra_params
:
auth_url
+=
"&
%s
"
%
urlencode
(
extra_params
)
return
auth_url
def
authorization_code
(
self
,
code
:
str
,
redirect_uri
:
str
,
**
extra_params
:
str
)
->
Dict
[
str
,
Any
]:
"""
Get OpenID Connect authentication tokens using Authorization
Code flow.
Raises:
KeycloakError in case of authentication failures
Args:
code: Authorization code provided by Keycloak
redirect_uri: URI to redirect to once a user is authenticated
(must be the same as the one provided to authorization_url):
extra_params: Extra parameters to add in the authorization request
payload.
"""
return
self
.
_keycloak
.
token
(
grant_type
=
"authorization_code"
,
code
=
code
,
redirect_uri
=
redirect_uri
,
**
extra_params
,
)
def
login
(
self
,
username
:
str
,
password
:
str
,
**
extra_params
:
str
)
->
Dict
[
str
,
Any
]:
"""
Get OpenID Connect authentication tokens using Direct Access Grant flow.
Raises:
KeycloakError in case of authentication failures
Args:
username: an existing username in the realm
password: password associated to username
extra_params: Extra parameters to add in the authorization request
payload.
"""
return
self
.
_keycloak
.
token
(
grant_type
=
"password"
,
scope
=
"openid"
,
username
=
username
,
password
=
password
,
**
extra_params
,
)
def
refresh_token
(
self
,
refresh_token
:
str
)
->
Dict
[
str
,
Any
]:
"""
Request a new access token from Keycloak using a refresh token.
Args:
refresh_token: A refresh token provided by Keycloak
Returns:
A dictionary filled with tokens info
"""
return
self
.
_keycloak
.
refresh_token
(
refresh_token
)
def
decode_token
(
self
,
token
:
str
,
options
:
Optional
[
Dict
[
str
,
Any
]]
=
None
)
->
Dict
[
str
,
Any
]:
"""
Try to decode a JWT token.
Args:
token: A JWT token to decode
options: Options for jose.jwt.decode
Returns:
A dictionary filled with decoded token content
"""
if
not
self
.
realm_public_key
:
realm_public_key
=
self
.
_keycloak
.
public_key
()
self
.
realm_public_key
=
"-----BEGIN PUBLIC KEY-----
\n
"
self
.
realm_public_key
+=
realm_public_key
self
.
realm_public_key
+=
"
\n
-----END PUBLIC KEY-----"
return
self
.
_keycloak
.
decode_token
(
token
,
key
=
self
.
realm_public_key
,
options
=
options
)
def
logout
(
self
,
refresh_token
:
str
)
->
None
:
"""
Logout a user by closing its authenticated session.
Args:
refresh_token: A refresh token provided by Keycloak
"""
self
.
_keycloak
.
logout
(
refresh_token
)
def
userinfo
(
self
,
access_token
:
str
)
->
Dict
[
str
,
Any
]:
"""
Return user information from its access token.
Args:
access_token: An access token provided by Keycloak
Returns:
A dictionary fillled with user information
"""
return
self
.
_keycloak
.
userinfo
(
access_token
)
@classmethod
def
from_config
(
cls
,
**
kwargs
:
Any
)
->
"KeycloakOpenIDConnect"
:
"""Instantiate a KeycloakOpenIDConnect class from a configuration dict.
Args:
kwargs: configuration dict for the instance, with one keycloak key, whose
value is a Dict with the following keys:
- server_url: URL of the Keycloak server
- realm_name: The realm name
- client_id: The OpenID Connect client identifier
Returns:
the KeycloakOpenIDConnect instance
"""
cfg
=
kwargs
[
"keycloak"
]
return
cls
(
server_url
=
cfg
[
"server_url"
],
realm_name
=
cfg
[
"realm_name"
],
client_id
=
cfg
[
"client_id"
],
)
@classmethod
def
from_configfile
(
cls
,
**
kwargs
:
Any
)
->
"KeycloakOpenIDConnect"
:
"""Instantiate a KeycloakOpenIDConnect class from the configuration loaded from the
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their
value is not None.
Args:
kwargs: kwargs passed to instantiation call
Returns:
the KeycloakOpenIDConnect instance
"""
config
=
dict
(
load_from_envvar
())
.
get
(
"keycloak"
,
{})
config
.
update
({
k
:
v
for
k
,
v
in
kwargs
.
items
()
if
v
is
not
None
})
return
cls
.
from_config
(
keycloak
=
config
)
def
keycloak_error_message
(
keycloak_error
:
KeycloakError
)
->
str
:
"""Transform a keycloak exception into an error message.
"""
msg_dict
=
json
.
loads
(
keycloak_error
.
error_message
.
decode
())
error_msg
=
msg_dict
[
"error"
]
error_desc
=
msg_dict
.
get
(
"error_description"
)
if
error_desc
:
error_msg
=
f
"{error_msg}: {error_desc}"
return
error_msg
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:43 PM (10 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3376213
Attached To
rDAUTH Common authentication libraries
Event Timeline
Log In to Comment