Changeset View
Changeset View
Standalone View
Standalone View
swh/web/auth/backends.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timedelta | from datetime import datetime, timedelta | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Optional | ||||
from django.core.cache import cache | from django.core.cache import cache | ||||
from django.http import HttpRequest | from django.http import HttpRequest | ||||
from django.utils import timezone | from django.utils import timezone | ||||
from rest_framework.authentication import BaseAuthentication | from rest_framework.authentication import BaseAuthentication | ||||
from rest_framework.exceptions import AuthenticationFailed | from rest_framework.exceptions import AuthenticationFailed | ||||
import sentry_sdk | import sentry_sdk | ||||
from swh.web.auth.keycloak import KeycloakOpenIDConnect | from swh.web.auth.keycloak import KeycloakOpenIDConnect | ||||
from swh.web.auth.utils import get_oidc_client | from swh.web.auth.utils import get_oidc_client | ||||
from swh.web.auth.models import OIDCUser | from swh.web.auth.models import OIDCUser | ||||
# OpenID Connect client to communicate with Keycloak server | # OpenID Connect client to communicate with Keycloak server | ||||
_oidc_client: KeycloakOpenIDConnect = get_oidc_client() | _oidc_client: KeycloakOpenIDConnect = get_oidc_client() | ||||
def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser: | def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser: | ||||
# compute an integer user identifier for Django User model | # compute an integer user identifier for Django User model | ||||
# by concatenating all groups of the UUID4 user identifier | # by concatenating all groups of the UUID4 user identifier | ||||
# generated by Keycloak and converting it from hex to decimal | # generated by Keycloak and converting it from hex to decimal | ||||
user_id = int(''.join(userinfo['sub'].split('-')), 16) | user_id = int(''.join(decoded_token['sub'].split('-')), 16) | ||||
# create a Django user that will not be saved to database | # create a Django user that will not be saved to database | ||||
user = OIDCUser(id=user_id, | user = OIDCUser(id=user_id, | ||||
username=userinfo['preferred_username'], | username=decoded_token['preferred_username'], | ||||
password='', | password='', | ||||
first_name=userinfo['given_name'], | first_name=decoded_token['given_name'], | ||||
last_name=userinfo['family_name'], | last_name=decoded_token['family_name'], | ||||
email=userinfo['email']) | email=decoded_token['email']) | ||||
# set is_staff user property based on groups | # set is_staff user property based on groups | ||||
user.is_staff = '/staff' in userinfo['groups'] | if 'groups' in decoded_token: | ||||
user.is_staff = '/staff' in decoded_token['groups'] | |||||
# add userinfo sub to custom User proxy model | # add user sub to custom User proxy model | ||||
user.sub = userinfo['sub'] | user.sub = decoded_token['sub'] | ||||
return user | return user | ||||
def _oidc_user_from_profile(oidc_profile: Dict[str, Any], | def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser: | ||||
userinfo: Optional[Dict[str, Any]] = None | |||||
) -> Tuple[OIDCUser, Dict[str, Any]]: | |||||
# get access token | |||||
access_token = oidc_profile['access_token'] | |||||
# request OIDC userinfo | |||||
if userinfo is None: | |||||
userinfo = _oidc_client.userinfo(access_token) | |||||
# create OIDCUser from userinfo | |||||
user = _oidc_user_from_info(userinfo) | |||||
# decode JWT token | # decode JWT token | ||||
decoded_token = _oidc_client.decode_token(access_token) | decoded_token = _oidc_client.decode_token(oidc_profile['access_token']) | ||||
# create OIDCUser from decoded token | |||||
user = _oidc_user_from_decoded_token(decoded_token) | |||||
# get authentication init datetime | # get authentication init datetime | ||||
auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) | auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) | ||||
exp_datetime = datetime.fromtimestamp(decoded_token['exp']) | |||||
# compute OIDC tokens expiration date | # compute OIDC tokens expiration date | ||||
oidc_profile['access_expiration'] = ( | oidc_profile['expires_at'] = exp_datetime | ||||
auth_datetime + | oidc_profile['refresh_expires_at'] = ( | ||||
timedelta(seconds=oidc_profile['expires_in'])) | |||||
oidc_profile['refresh_expiration'] = ( | |||||
auth_datetime + | auth_datetime + | ||||
timedelta(seconds=oidc_profile['refresh_expires_in'])) | timedelta(seconds=oidc_profile['refresh_expires_in'])) | ||||
# add OIDC profile data to custom User proxy model | # add OIDC profile data to custom User proxy model | ||||
for key, val in oidc_profile.items(): | for key, val in oidc_profile.items(): | ||||
if hasattr(user, key): | if hasattr(user, key): | ||||
setattr(user, key, val) | setattr(user, key, val) | ||||
return user, userinfo | return user | ||||
class OIDCAuthorizationCodePKCEBackend: | class OIDCAuthorizationCodePKCEBackend: | ||||
def authenticate(self, request: HttpRequest, code: str, code_verifier: str, | def authenticate(self, request: HttpRequest, code: str, code_verifier: str, | ||||
redirect_uri: str) -> Optional[OIDCUser]: | redirect_uri: str) -> Optional[OIDCUser]: | ||||
user = None | user = None | ||||
try: | try: | ||||
# try to authenticate user with OIDC PKCE authorization code flow | # try to authenticate user with OIDC PKCE authorization code flow | ||||
oidc_profile = _oidc_client.authorization_code( | oidc_profile = _oidc_client.authorization_code( | ||||
code, redirect_uri, code_verifier=code_verifier) | code, redirect_uri, code_verifier=code_verifier) | ||||
# create Django user | # create Django user | ||||
user, userinfo = _oidc_user_from_profile(oidc_profile) | user = _oidc_user_from_profile(oidc_profile) | ||||
# save authenticated user data in cache | # set cache key TTL as access token expiration time | ||||
cache.set(f'user_{user.id}', | assert user.expires_at | ||||
{'userinfo': userinfo, 'oidc_profile': oidc_profile}, | ttl = int(user.expires_at.timestamp() - timezone.now().timestamp()) | ||||
timeout=oidc_profile['refresh_expires_in']) | |||||
# save oidc_profile in cache | |||||
cache.set(f'oidc_user_{user.id}', oidc_profile, | |||||
timeout=max(0, ttl)) | |||||
except Exception as e: | except Exception as e: | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
return user | return user | ||||
def get_user(self, user_id: int) -> Optional[OIDCUser]: | def get_user(self, user_id: int) -> Optional[OIDCUser]: | ||||
# get user data from cache | # get oidc profile from cache | ||||
user_oidc_data = cache.get(f'user_{user_id}') | oidc_profile = cache.get(f'oidc_user_{user_id}') | ||||
if user_oidc_data: | if oidc_profile: | ||||
try: | try: | ||||
user, _ = _oidc_user_from_profile( | user = _oidc_user_from_profile(oidc_profile) | ||||
user_oidc_data['oidc_profile'], user_oidc_data['userinfo']) | |||||
# restore auth backend | # restore auth backend | ||||
setattr(user, 'backend', | setattr(user, 'backend', | ||||
f'{__name__}.{self.__class__.__name__}') | f'{__name__}.{self.__class__.__name__}') | ||||
return user | return user | ||||
except Exception as e: | except Exception as e: | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
return None | return None | ||||
else: | else: | ||||
Show All 11 Lines | def authenticate(self, request): | ||||
except ValueError: | except ValueError: | ||||
raise AuthenticationFailed( | raise AuthenticationFailed( | ||||
'Invalid HTTP authorization header format') | 'Invalid HTTP authorization header format') | ||||
if auth_type != 'Bearer': | if auth_type != 'Bearer': | ||||
raise AuthenticationFailed( | raise AuthenticationFailed( | ||||
(f'Invalid or unsupported HTTP authorization' | (f'Invalid or unsupported HTTP authorization' | ||||
f' type ({auth_type}).')) | f' type ({auth_type}).')) | ||||
try: | try: | ||||
# attempt to decode token | # attempt to decode token | ||||
decoded = _oidc_client.decode_token(token) | decoded_token = _oidc_client.decode_token(token) | ||||
userinfo = cache.get(decoded['sub']) | |||||
if userinfo: | |||||
user = _oidc_user_from_info(userinfo) | |||||
else: | |||||
# get OIDC userinfo | |||||
userinfo = _oidc_client.userinfo(token) | |||||
# create Django user | # create Django user | ||||
user = _oidc_user_from_info(userinfo) | user = _oidc_user_from_decoded_token(decoded_token) | ||||
# cache userinfo until token expires | |||||
max_ttl = decoded['exp'] - decoded['auth_time'] | |||||
ttl = decoded['exp'] - int(timezone.now().timestamp()) | |||||
ttl = max(0, min(ttl, max_ttl)) | |||||
cache.set(decoded['sub'], userinfo, timeout=ttl) | |||||
except Exception as e: | except Exception as e: | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
raise AuthenticationFailed(str(e)) | raise AuthenticationFailed(str(e)) | ||||
return user, None | return user, None |