Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348313
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
View Options
diff --git a/requirements.txt b/requirements.txt
index 54ce666..a536a34 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-
+python-keycloak >= 0.19.0
diff --git a/swh/auth/__init__.py b/swh/auth/__init__.py
index e69de29..6741769 100644
--- a/swh/auth/__init__.py
+++ b/swh/auth/__init__.py
@@ -0,0 +1,140 @@
+# Copyright (C) 2020-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Optional, Tuple
+from urllib.parse import urlencode
+
+from keycloak import KeycloakOpenID
+
+
+class KeycloakOpenIDConnect:
+ """
+ Wrapper class around python-keycloak to ease the interaction with Keycloak
+ for managing authentication and user permissions with OpenID Connect.
+ """
+
+ def __init__(
+ self,
+ server_url: str,
+ realm_name: str,
+ client_id: str,
+ realm_public_key: str = "",
+ ):
+ """
+ Args:
+ server_url: URL of the Keycloak server
+ realm_name: The realm name
+ client_id: The OpenID Connect client identifier
+ realm_public_key: The realm public key (will be dynamically
+ retrieved if not provided)
+ """
+ self._keycloak = KeycloakOpenID(
+ server_url=server_url, client_id=client_id, realm_name=realm_name,
+ )
+
+ self.server_url = server_url
+ self.realm_name = realm_name
+ self.client_id = client_id
+ self.realm_public_key = realm_public_key
+
+ def well_known(self) -> Dict[str, Any]:
+ """
+ Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
+
+ Returns:
+ A dictionary filled with OpenID Connect URIS.
+ """
+ return self._keycloak.well_know()
+
+ def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
+ """
+ Get OpenID Connect authorization URL to authenticate users.
+
+ Args:
+ redirect_uri: URI to redirect to once a user is authenticated
+ extra_params: Extra query parameters to add to the
+ authorization URL
+ """
+ auth_url = self._keycloak.auth_url(redirect_uri)
+ if extra_params:
+ auth_url += "&%s" % urlencode(extra_params)
+ return auth_url
+
+ def authorization_code(
+ self, code: str, redirect_uri: str, **extra_params: str
+ ) -> Dict[str, Any]:
+ """
+ Get OpenID Connect authentication tokens using Authorization
+ Code flow.
+
+ Args:
+ code: Authorization code provided by Keycloak
+ redirect_uri: URI to redirect to once a user is authenticated
+ (must be the same as the one provided to authorization_url):
+ extra_params: Extra parameters to add in the authorization request
+ payload.
+ """
+ return self._keycloak.token(
+ grant_type="authorization_code",
+ code=code,
+ redirect_uri=redirect_uri,
+ **extra_params,
+ )
+
+ def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
+ """
+ Request a new access token from Keycloak using a refresh token.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+
+ Returns:
+ A dictionary filled with tokens info
+ """
+ return self._keycloak.refresh_token(refresh_token)
+
+ def decode_token(
+ self, token: str, options: Optional[Dict[str, Any]] = None
+ ) -> Dict[str, Any]:
+ """
+ Try to decode a JWT token.
+
+ Args:
+ token: A JWT token to decode
+ options: Options for jose.jwt.decode
+
+ Returns:
+ A dictionary filled with decoded token content
+ """
+ if not self.realm_public_key:
+ realm_public_key = self._keycloak.public_key()
+ self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
+ self.realm_public_key += realm_public_key
+ self.realm_public_key += "\n-----END PUBLIC KEY-----"
+
+ return self._keycloak.decode_token(
+ token, key=self.realm_public_key, options=options
+ )
+
+ def logout(self, refresh_token: str) -> None:
+ """
+ Logout a user by closing its authenticated session.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+ """
+ self._keycloak.logout(refresh_token)
+
+ def userinfo(self, access_token: str) -> Dict[str, Any]:
+ """
+ Return user information from its access token.
+
+ Args:
+ access_token: An access token provided by Keycloak
+
+ Returns:
+ A dictionary fillled with user information
+ """
+ return self._keycloak.userinfo(access_token)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 6:23 PM (5 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276091
Attached To
rDAUTH Common authentication libraries
Event Timeline
Log In to Comment