diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index b6b8add0..585724d6 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,164 +1,164 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional, Tuple from django.core.cache import cache from django.http import HttpRequest from django.utils import timezone from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed import sentry_sdk from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import get_oidc_client from swh.web.auth.models import OIDCUser # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int(''.join(userinfo['sub'].split('-')), 16) # create a Django user that will not be saved to database user = OIDCUser(id=user_id, username=userinfo['preferred_username'], password='', first_name=userinfo['given_name'], last_name=userinfo['family_name'], email=userinfo['email']) # set is_staff user property based on groups user.is_staff = '/staff' in userinfo['groups'] # add userinfo sub to custom User proxy model user.sub = userinfo['sub'] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any], userinfo: Optional[Dict[str, Any]] = None ) -> Tuple[OIDCUser, Dict[str, Any]]: # get access token access_token = oidc_profile['access_token'] # request OIDC userinfo if userinfo is None: userinfo = _oidc_client.userinfo(access_token) # create OIDCUser from userinfo user = _oidc_user_from_info(userinfo) # decode JWT token decoded_token = _oidc_client.decode_token(access_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) # compute OIDC tokens expiration date oidc_profile['access_expiration'] = ( auth_datetime + timedelta(seconds=oidc_profile['expires_in'])) oidc_profile['refresh_expiration'] = ( auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in'])) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user, userinfo class OIDCAuthorizationCodePKCEBackend: def authenticate(self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier) # create Django user user, userinfo = _oidc_user_from_profile(oidc_profile) # save authenticated user data in cache cache.set(f'user_{user.id}', {'userinfo': userinfo, 'oidc_profile': oidc_profile}, timeout=oidc_profile['refresh_expires_in']) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get user data from cache user_oidc_data = cache.get(f'user_{user_id}') if user_oidc_data: try: user, _ = _oidc_user_from_profile( user_oidc_data['oidc_profile'], user_oidc_data['userinfo']) # restore auth backend setattr(user, 'backend', f'{__name__}.{self.__class__.__name__}') return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None class OIDCBearerTokenAuthentication(BaseAuthentication): def authenticate(self, request): auth_header = request.META.get('HTTP_AUTHORIZATION') if auth_header is None: return None try: auth_type, token = auth_header.split(' ', 1) except ValueError: raise AuthenticationFailed( 'Invalid HTTP authorization header format') if auth_type != 'Bearer': raise AuthenticationFailed( (f'Invalid or unsupported HTTP authorization' f' type ({auth_type}).')) try: # attempt to decode token decoded = _oidc_client.decode_token(token) userinfo = cache.get(decoded['sub']) if userinfo: user = _oidc_user_from_info(userinfo) else: # get OIDC userinfo userinfo = _oidc_client.userinfo(token) # create Django user user = _oidc_user_from_info(userinfo) # cache userinfo until token expires - max_ttl = decoded['exp'] - decoded['auth_time'] - 1 - ttl = decoded['exp'] - int(timezone.now().timestamp()) - 1 + max_ttl = decoded['exp'] - decoded['auth_time'] + ttl = decoded['exp'] - int(timezone.now().timestamp()) ttl = max(0, min(ttl, max_ttl)) cache.set(decoded['sub'], userinfo, timeout=ttl) except Exception as e: sentry_sdk.capture_exception(e) raise AuthenticationFailed(str(e)) return user, None