Changeset View
Changeset View
Standalone View
Standalone View
swh/auth/keycloak.py
Show First 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | ) -> Dict[str, Any]: | ||||
return self._keycloak.token( | return self._keycloak.token( | ||||
grant_type="authorization_code", | grant_type="authorization_code", | ||||
code=code, | code=code, | ||||
redirect_uri=redirect_uri, | redirect_uri=redirect_uri, | ||||
**extra_params, | **extra_params, | ||||
) | ) | ||||
def login( | def login( | ||||
self, username: str, password: str, **extra_params: str | self, username: str, password: str, scope: str = "openid", **extra_params: str | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
""" | """ | ||||
Get OpenID Connect authentication tokens using Direct Access Grant flow. | Get OpenID Connect authentication tokens using Direct Access Grant flow. | ||||
Raises: | Raises: | ||||
KeycloakError in case of authentication failures | KeycloakError in case of authentication failures | ||||
Args: | Args: | ||||
username: an existing username in the realm | username: an existing username in the realm | ||||
password: password associated to username | password: password associated to username | ||||
extra_params: Extra parameters to add in the authorization request | extra_params: Extra parameters to add in the authorization request | ||||
payload. | payload. | ||||
""" | """ | ||||
return self._keycloak.token( | return self._keycloak.token( | ||||
grant_type="password", | grant_type="password", | ||||
scope="openid", | scope=scope, | ||||
username=username, | username=username, | ||||
password=password, | password=password, | ||||
**extra_params, | **extra_params, | ||||
) | ) | ||||
def refresh_token(self, refresh_token: str) -> Dict[str, Any]: | def refresh_token(self, refresh_token: str) -> Dict[str, Any]: | ||||
""" | """ | ||||
Request a new access token from Keycloak using a refresh token. | Request a new access token from Keycloak using a refresh token. | ||||
▲ Show 20 Lines • Show All 90 Lines • ▼ Show 20 Lines | def from_configfile(cls, **kwargs: Any) -> "KeycloakOpenIDConnect": | ||||
config.update({k: v for k, v in kwargs.items() if v is not None}) | config.update({k: v for k, v in kwargs.items() if v is not None}) | ||||
return cls.from_config(keycloak=config) | return cls.from_config(keycloak=config) | ||||
def keycloak_error_message(keycloak_error: KeycloakError) -> str: | def keycloak_error_message(keycloak_error: KeycloakError) -> str: | ||||
"""Transform a keycloak exception into an error message. | """Transform a keycloak exception into an error message. | ||||
""" | """ | ||||
try: | |||||
# keycloak error wrapped in a JSON document | |||||
msg_dict = json.loads(keycloak_error.error_message.decode()) | msg_dict = json.loads(keycloak_error.error_message.decode()) | ||||
error_msg = msg_dict["error"] | error_msg = msg_dict["error"] | ||||
error_desc = msg_dict.get("error_description") | error_desc = msg_dict.get("error_description") | ||||
if error_desc: | if error_desc: | ||||
error_msg = f"{error_msg}: {error_desc}" | error_msg = f"{error_msg}: {error_desc}" | ||||
return error_msg | return error_msg | ||||
except Exception: | |||||
# fallback: return error message string | |||||
return keycloak_error.error_message |