diff --git a/requirements.txt b/requirements.txt index 54ce666..a536a34 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html - +python-keycloak >= 0.19.0 diff --git a/swh/auth/__init__.py b/swh/auth/__init__.py index e69de29..6741769 100644 --- a/swh/auth/__init__.py +++ b/swh/auth/__init__.py @@ -0,0 +1,140 @@ +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Optional, Tuple +from urllib.parse import urlencode + +from keycloak import KeycloakOpenID + + +class KeycloakOpenIDConnect: + """ + Wrapper class around python-keycloak to ease the interaction with Keycloak + for managing authentication and user permissions with OpenID Connect. + """ + + def __init__( + self, + server_url: str, + realm_name: str, + client_id: str, + realm_public_key: str = "", + ): + """ + Args: + server_url: URL of the Keycloak server + realm_name: The realm name + client_id: The OpenID Connect client identifier + realm_public_key: The realm public key (will be dynamically + retrieved if not provided) + """ + self._keycloak = KeycloakOpenID( + server_url=server_url, client_id=client_id, realm_name=realm_name, + ) + + self.server_url = server_url + self.realm_name = realm_name + self.client_id = client_id + self.realm_public_key = realm_public_key + + def well_known(self) -> Dict[str, Any]: + """ + Retrieve the OpenID Connect Well-Known URI registry from Keycloak. + + Returns: + A dictionary filled with OpenID Connect URIS. + """ + return self._keycloak.well_know() + + def authorization_url(self, redirect_uri: str, **extra_params: str) -> str: + """ + Get OpenID Connect authorization URL to authenticate users. + + Args: + redirect_uri: URI to redirect to once a user is authenticated + extra_params: Extra query parameters to add to the + authorization URL + """ + auth_url = self._keycloak.auth_url(redirect_uri) + if extra_params: + auth_url += "&%s" % urlencode(extra_params) + return auth_url + + def authorization_code( + self, code: str, redirect_uri: str, **extra_params: str + ) -> Dict[str, Any]: + """ + Get OpenID Connect authentication tokens using Authorization + Code flow. + + Args: + code: Authorization code provided by Keycloak + redirect_uri: URI to redirect to once a user is authenticated + (must be the same as the one provided to authorization_url): + extra_params: Extra parameters to add in the authorization request + payload. + """ + return self._keycloak.token( + grant_type="authorization_code", + code=code, + redirect_uri=redirect_uri, + **extra_params, + ) + + def refresh_token(self, refresh_token: str) -> Dict[str, Any]: + """ + Request a new access token from Keycloak using a refresh token. + + Args: + refresh_token: A refresh token provided by Keycloak + + Returns: + A dictionary filled with tokens info + """ + return self._keycloak.refresh_token(refresh_token) + + def decode_token( + self, token: str, options: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Try to decode a JWT token. + + Args: + token: A JWT token to decode + options: Options for jose.jwt.decode + + Returns: + A dictionary filled with decoded token content + """ + if not self.realm_public_key: + realm_public_key = self._keycloak.public_key() + self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n" + self.realm_public_key += realm_public_key + self.realm_public_key += "\n-----END PUBLIC KEY-----" + + return self._keycloak.decode_token( + token, key=self.realm_public_key, options=options + ) + + def logout(self, refresh_token: str) -> None: + """ + Logout a user by closing its authenticated session. + + Args: + refresh_token: A refresh token provided by Keycloak + """ + self._keycloak.logout(refresh_token) + + def userinfo(self, access_token: str) -> Dict[str, Any]: + """ + Return user information from its access token. + + Args: + access_token: An access token provided by Keycloak + + Returns: + A dictionary fillled with user information + """ + return self._keycloak.userinfo(access_token)