Page MenuHomeSoftware Heritage

D2876.id10249.diff
No OneTemporary

D2876.id10249.diff

diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Dict, Optional
from django.core.cache import cache
from django.http import HttpRequest
@@ -24,53 +24,44 @@
_oidc_client: KeycloakOpenIDConnect = get_oidc_client()
-def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser:
+def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
- user_id = int(''.join(userinfo['sub'].split('-')), 16)
+ user_id = int(''.join(decoded_token['sub'].split('-')), 16)
# create a Django user that will not be saved to database
user = OIDCUser(id=user_id,
- username=userinfo['preferred_username'],
+ username=decoded_token['preferred_username'],
password='',
- first_name=userinfo['given_name'],
- last_name=userinfo['family_name'],
- email=userinfo['email'])
+ first_name=decoded_token['given_name'],
+ last_name=decoded_token['family_name'],
+ email=decoded_token['email'])
# set is_staff user property based on groups
- user.is_staff = '/staff' in userinfo['groups']
+ user.is_staff = '/staff' in decoded_token['groups']
- # add userinfo sub to custom User proxy model
- user.sub = userinfo['sub']
+ # add user sub to custom User proxy model
+ user.sub = decoded_token['sub']
return user
-def _oidc_user_from_profile(oidc_profile: Dict[str, Any],
- userinfo: Optional[Dict[str, Any]] = None
- ) -> Tuple[OIDCUser, Dict[str, Any]]:
- # get access token
- access_token = oidc_profile['access_token']
+def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
- # request OIDC userinfo
- if userinfo is None:
- userinfo = _oidc_client.userinfo(access_token)
+ # decode JWT token
+ decoded_token = _oidc_client.decode_token(oidc_profile['access_token'])
# create OIDCUser from userinfo
- user = _oidc_user_from_info(userinfo)
-
- # decode JWT token
- decoded_token = _oidc_client.decode_token(access_token)
+ user = _oidc_user_from_decoded_token(decoded_token)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+ exp_datetime = datetime.fromtimestamp(decoded_token['exp'])
# compute OIDC tokens expiration date
- oidc_profile['access_expiration'] = (
- auth_datetime +
- timedelta(seconds=oidc_profile['expires_in']))
- oidc_profile['refresh_expiration'] = (
+ oidc_profile['expires_at'] = exp_datetime
+ oidc_profile['refresh_expires_at'] = (
auth_datetime +
timedelta(seconds=oidc_profile['refresh_expires_in']))
@@ -79,7 +70,7 @@
if hasattr(user, key):
setattr(user, key, val)
- return user, userinfo
+ return user
class OIDCAuthorizationCodePKCEBackend:
@@ -94,24 +85,26 @@
code, redirect_uri, code_verifier=code_verifier)
# create Django user
- user, userinfo = _oidc_user_from_profile(oidc_profile)
+ user = _oidc_user_from_profile(oidc_profile)
- # save authenticated user data in cache
- cache.set(f'user_{user.id}',
- {'userinfo': userinfo, 'oidc_profile': oidc_profile},
- timeout=oidc_profile['refresh_expires_in'])
+ # set cache key TTL as access token expiration time
+ assert user.expires_at
+ ttl = int(user.expires_at.timestamp() - timezone.now().timestamp())
+
+ # save oidc_profile in cache
+ cache.set(f'oidc_user_{user.id}', oidc_profile,
+ timeout=max(0, ttl))
except Exception as e:
sentry_sdk.capture_exception(e)
return user
def get_user(self, user_id: int) -> Optional[OIDCUser]:
- # get user data from cache
- user_oidc_data = cache.get(f'user_{user_id}')
- if user_oidc_data:
+ # get oidc profile from cache
+ oidc_profile = cache.get(f'oidc_user_{user_id}')
+ if oidc_profile:
try:
- user, _ = _oidc_user_from_profile(
- user_oidc_data['oidc_profile'], user_oidc_data['userinfo'])
+ user = _oidc_user_from_profile(oidc_profile)
# restore auth backend
setattr(user, 'backend',
f'{__name__}.{self.__class__.__name__}')
@@ -139,24 +132,11 @@
raise AuthenticationFailed(
(f'Invalid or unsupported HTTP authorization'
f' type ({auth_type}).'))
-
try:
# attempt to decode token
- decoded = _oidc_client.decode_token(token)
- userinfo = cache.get(decoded['sub'])
- if userinfo:
- user = _oidc_user_from_info(userinfo)
- else:
- # get OIDC userinfo
- userinfo = _oidc_client.userinfo(token)
- # create Django user
- user = _oidc_user_from_info(userinfo)
- # cache userinfo until token expires
- max_ttl = decoded['exp'] - decoded['auth_time']
- ttl = decoded['exp'] - int(timezone.now().timestamp())
- ttl = max(0, min(ttl, max_ttl))
- cache.set(decoded['sub'], userinfo, timeout=ttl)
-
+ decoded_token = _oidc_client.decode_token(token)
+ # create Django user
+ user = _oidc_user_from_decoded_token(decoded_token)
except Exception as e:
sentry_sdk.capture_exception(e)
raise AuthenticationFailed(str(e))
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
--- a/swh/web/auth/models.py
+++ b/swh/web/auth/models.py
@@ -24,10 +24,10 @@
# OIDC tokens and session related data, only relevant when a user
# authenticates from a web browser
access_token: Optional[str] = None
- access_expiration: Optional[datetime] = None
+ expires_at: Optional[datetime] = None
id_token: Optional[str] = None
refresh_token: Optional[str] = None
- refresh_expiration: Optional[datetime] = None
+ refresh_expires_at: Optional[datetime] = None
scope: Optional[str] = None
session_state: Optional[str] = None
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -104,7 +104,7 @@
# end OpenID Connect session
oidc_client.logout(refresh_token)
# remove user data from cache
- cache.delete(f'user_{user.id}')
+ cache.delete(f'oidc_user_{user.id}')
logout_url = reverse('logout', query_params={'remote_user': 1})
return HttpResponseRedirect(request.build_absolute_uri(logout_url))
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
--- a/swh/web/tests/auth/keycloak_mock.py
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -22,6 +22,7 @@
super().__init__(swhweb_config['keycloak']['server_url'],
swhweb_config['keycloak']['realm_name'],
OIDC_SWH_WEB_CLIENT_ID)
+ self.auth_success = auth_success
self._keycloak.public_key = lambda: realm_public_key
self._keycloak.well_know = lambda: {
'issuer': f'{self.server_url}realms/{self.realm_name}',
@@ -57,14 +58,17 @@
self.logout.side_effect = exception
def decode_token(self, token):
- # skip signature expiration check as we use a static oidc_profile
- # for the tests with expired tokens in it
- options = {'verify_exp': False}
+ options = {}
+ if self.auth_success:
+ # skip signature expiration check as we use a static oidc_profile
+ # for the tests with expired tokens in it
+ options['verify_exp'] = False
decoded = super().decode_token(token, options)
# tweak auth and exp time for tests
expire_in = decoded['exp'] - decoded['auth_time']
decoded['auth_time'] = int(timezone.now().timestamp())
decoded['exp'] = decoded['auth_time'] + expire_in
+ decoded['groups'] = ['/staff']
return decoded
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -30,18 +30,17 @@
redirect_uri='https://localhost:5004')
-def _check_authenticated_user(user):
- userinfo = sample_data.userinfo
+def _check_authenticated_user(user, decoded_token):
assert user is not None
assert isinstance(user, OIDCUser)
assert user.id != 0
- assert user.username == userinfo['preferred_username']
+ assert user.username == decoded_token['preferred_username']
assert user.password == ''
- assert user.first_name == userinfo['given_name']
- assert user.last_name == userinfo['family_name']
- assert user.email == userinfo['email']
- assert user.is_staff == ('/staff' in userinfo['groups'])
- assert user.sub == userinfo['sub']
+ assert user.first_name == decoded_token['given_name']
+ assert user.last_name == decoded_token['family_name']
+ assert user.email == decoded_token['email']
+ assert user.is_staff == ('/staff' in decoded_token['groups'])
+ assert user.sub == decoded_token['sub']
@pytest.mark.django_db
@@ -50,22 +49,19 @@
oidc_profile = sample_data.oidc_profile
user = _authenticate_user(request_factory)
- _check_authenticated_user(user)
+ decoded_token = kc_oidc_mock.decode_token(user.access_token)
+ _check_authenticated_user(user, decoded_token)
- decoded_token = kc_oidc_mock.decode_token(
- sample_data.oidc_profile['access_token'])
auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
-
- access_expiration = (
- auth_datetime + timedelta(seconds=oidc_profile['expires_in']))
- refresh_expiration = (
+ exp_datetime = datetime.fromtimestamp(decoded_token['exp'])
+ refresh_exp_datetime = (
auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in']))
assert user.access_token == oidc_profile['access_token']
- assert user.access_expiration == access_expiration
+ assert user.expires_at == exp_datetime
assert user.id_token == oidc_profile['id_token']
assert user.refresh_token == oidc_profile['refresh_token']
- assert user.refresh_expiration == refresh_expiration
+ assert user.refresh_expires_at == refresh_exp_datetime
assert user.scope == oidc_profile['scope']
assert user.session_state == oidc_profile['session_state']
@@ -93,25 +89,16 @@
kc_oidc_mock = mock_keycloak(mocker)
access_token = sample_data.oidc_profile['access_token']
+ decoded_token = kc_oidc_mock.decode_token(access_token)
request = api_request_factory.get(
url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
- # first authentication
user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user)
+ _check_authenticated_user(user, decoded_token)
# oidc_profile is not filled when authenticating through bearer token
assert hasattr(user, 'access_token') and user.access_token is None
- # second authentication, should fetch userinfo from cache
- # until token expires
- user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user)
- assert hasattr(user, 'access_token') and user.access_token is None
-
- # check user request to keycloak has been sent only once
- kc_oidc_mock.userinfo.assert_called_once_with(access_token)
-
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_failure(mocker,

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226364

Event Timeline