Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345426
D2876.id10249.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
D2876.id10249.diff
View Options
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Dict, Optional
from django.core.cache import cache
from django.http import HttpRequest
@@ -24,53 +24,44 @@
_oidc_client: KeycloakOpenIDConnect = get_oidc_client()
-def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser:
+def _oidc_user_from_decoded_token(decoded_token: Dict[str, Any]) -> OIDCUser:
# compute an integer user identifier for Django User model
# by concatenating all groups of the UUID4 user identifier
# generated by Keycloak and converting it from hex to decimal
- user_id = int(''.join(userinfo['sub'].split('-')), 16)
+ user_id = int(''.join(decoded_token['sub'].split('-')), 16)
# create a Django user that will not be saved to database
user = OIDCUser(id=user_id,
- username=userinfo['preferred_username'],
+ username=decoded_token['preferred_username'],
password='',
- first_name=userinfo['given_name'],
- last_name=userinfo['family_name'],
- email=userinfo['email'])
+ first_name=decoded_token['given_name'],
+ last_name=decoded_token['family_name'],
+ email=decoded_token['email'])
# set is_staff user property based on groups
- user.is_staff = '/staff' in userinfo['groups']
+ user.is_staff = '/staff' in decoded_token['groups']
- # add userinfo sub to custom User proxy model
- user.sub = userinfo['sub']
+ # add user sub to custom User proxy model
+ user.sub = decoded_token['sub']
return user
-def _oidc_user_from_profile(oidc_profile: Dict[str, Any],
- userinfo: Optional[Dict[str, Any]] = None
- ) -> Tuple[OIDCUser, Dict[str, Any]]:
- # get access token
- access_token = oidc_profile['access_token']
+def _oidc_user_from_profile(oidc_profile: Dict[str, Any]) -> OIDCUser:
- # request OIDC userinfo
- if userinfo is None:
- userinfo = _oidc_client.userinfo(access_token)
+ # decode JWT token
+ decoded_token = _oidc_client.decode_token(oidc_profile['access_token'])
# create OIDCUser from userinfo
- user = _oidc_user_from_info(userinfo)
-
- # decode JWT token
- decoded_token = _oidc_client.decode_token(access_token)
+ user = _oidc_user_from_decoded_token(decoded_token)
# get authentication init datetime
auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+ exp_datetime = datetime.fromtimestamp(decoded_token['exp'])
# compute OIDC tokens expiration date
- oidc_profile['access_expiration'] = (
- auth_datetime +
- timedelta(seconds=oidc_profile['expires_in']))
- oidc_profile['refresh_expiration'] = (
+ oidc_profile['expires_at'] = exp_datetime
+ oidc_profile['refresh_expires_at'] = (
auth_datetime +
timedelta(seconds=oidc_profile['refresh_expires_in']))
@@ -79,7 +70,7 @@
if hasattr(user, key):
setattr(user, key, val)
- return user, userinfo
+ return user
class OIDCAuthorizationCodePKCEBackend:
@@ -94,24 +85,26 @@
code, redirect_uri, code_verifier=code_verifier)
# create Django user
- user, userinfo = _oidc_user_from_profile(oidc_profile)
+ user = _oidc_user_from_profile(oidc_profile)
- # save authenticated user data in cache
- cache.set(f'user_{user.id}',
- {'userinfo': userinfo, 'oidc_profile': oidc_profile},
- timeout=oidc_profile['refresh_expires_in'])
+ # set cache key TTL as access token expiration time
+ assert user.expires_at
+ ttl = int(user.expires_at.timestamp() - timezone.now().timestamp())
+
+ # save oidc_profile in cache
+ cache.set(f'oidc_user_{user.id}', oidc_profile,
+ timeout=max(0, ttl))
except Exception as e:
sentry_sdk.capture_exception(e)
return user
def get_user(self, user_id: int) -> Optional[OIDCUser]:
- # get user data from cache
- user_oidc_data = cache.get(f'user_{user_id}')
- if user_oidc_data:
+ # get oidc profile from cache
+ oidc_profile = cache.get(f'oidc_user_{user_id}')
+ if oidc_profile:
try:
- user, _ = _oidc_user_from_profile(
- user_oidc_data['oidc_profile'], user_oidc_data['userinfo'])
+ user = _oidc_user_from_profile(oidc_profile)
# restore auth backend
setattr(user, 'backend',
f'{__name__}.{self.__class__.__name__}')
@@ -139,24 +132,11 @@
raise AuthenticationFailed(
(f'Invalid or unsupported HTTP authorization'
f' type ({auth_type}).'))
-
try:
# attempt to decode token
- decoded = _oidc_client.decode_token(token)
- userinfo = cache.get(decoded['sub'])
- if userinfo:
- user = _oidc_user_from_info(userinfo)
- else:
- # get OIDC userinfo
- userinfo = _oidc_client.userinfo(token)
- # create Django user
- user = _oidc_user_from_info(userinfo)
- # cache userinfo until token expires
- max_ttl = decoded['exp'] - decoded['auth_time']
- ttl = decoded['exp'] - int(timezone.now().timestamp())
- ttl = max(0, min(ttl, max_ttl))
- cache.set(decoded['sub'], userinfo, timeout=ttl)
-
+ decoded_token = _oidc_client.decode_token(token)
+ # create Django user
+ user = _oidc_user_from_decoded_token(decoded_token)
except Exception as e:
sentry_sdk.capture_exception(e)
raise AuthenticationFailed(str(e))
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
--- a/swh/web/auth/models.py
+++ b/swh/web/auth/models.py
@@ -24,10 +24,10 @@
# OIDC tokens and session related data, only relevant when a user
# authenticates from a web browser
access_token: Optional[str] = None
- access_expiration: Optional[datetime] = None
+ expires_at: Optional[datetime] = None
id_token: Optional[str] = None
refresh_token: Optional[str] = None
- refresh_expiration: Optional[datetime] = None
+ refresh_expires_at: Optional[datetime] = None
scope: Optional[str] = None
session_state: Optional[str] = None
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
--- a/swh/web/auth/views.py
+++ b/swh/web/auth/views.py
@@ -104,7 +104,7 @@
# end OpenID Connect session
oidc_client.logout(refresh_token)
# remove user data from cache
- cache.delete(f'user_{user.id}')
+ cache.delete(f'oidc_user_{user.id}')
logout_url = reverse('logout', query_params={'remote_user': 1})
return HttpResponseRedirect(request.build_absolute_uri(logout_url))
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
--- a/swh/web/tests/auth/keycloak_mock.py
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -22,6 +22,7 @@
super().__init__(swhweb_config['keycloak']['server_url'],
swhweb_config['keycloak']['realm_name'],
OIDC_SWH_WEB_CLIENT_ID)
+ self.auth_success = auth_success
self._keycloak.public_key = lambda: realm_public_key
self._keycloak.well_know = lambda: {
'issuer': f'{self.server_url}realms/{self.realm_name}',
@@ -57,14 +58,17 @@
self.logout.side_effect = exception
def decode_token(self, token):
- # skip signature expiration check as we use a static oidc_profile
- # for the tests with expired tokens in it
- options = {'verify_exp': False}
+ options = {}
+ if self.auth_success:
+ # skip signature expiration check as we use a static oidc_profile
+ # for the tests with expired tokens in it
+ options['verify_exp'] = False
decoded = super().decode_token(token, options)
# tweak auth and exp time for tests
expire_in = decoded['exp'] - decoded['auth_time']
decoded['auth_time'] = int(timezone.now().timestamp())
decoded['exp'] = decoded['auth_time'] + expire_in
+ decoded['groups'] = ['/staff']
return decoded
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -30,18 +30,17 @@
redirect_uri='https://localhost:5004')
-def _check_authenticated_user(user):
- userinfo = sample_data.userinfo
+def _check_authenticated_user(user, decoded_token):
assert user is not None
assert isinstance(user, OIDCUser)
assert user.id != 0
- assert user.username == userinfo['preferred_username']
+ assert user.username == decoded_token['preferred_username']
assert user.password == ''
- assert user.first_name == userinfo['given_name']
- assert user.last_name == userinfo['family_name']
- assert user.email == userinfo['email']
- assert user.is_staff == ('/staff' in userinfo['groups'])
- assert user.sub == userinfo['sub']
+ assert user.first_name == decoded_token['given_name']
+ assert user.last_name == decoded_token['family_name']
+ assert user.email == decoded_token['email']
+ assert user.is_staff == ('/staff' in decoded_token['groups'])
+ assert user.sub == decoded_token['sub']
@pytest.mark.django_db
@@ -50,22 +49,19 @@
oidc_profile = sample_data.oidc_profile
user = _authenticate_user(request_factory)
- _check_authenticated_user(user)
+ decoded_token = kc_oidc_mock.decode_token(user.access_token)
+ _check_authenticated_user(user, decoded_token)
- decoded_token = kc_oidc_mock.decode_token(
- sample_data.oidc_profile['access_token'])
auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
-
- access_expiration = (
- auth_datetime + timedelta(seconds=oidc_profile['expires_in']))
- refresh_expiration = (
+ exp_datetime = datetime.fromtimestamp(decoded_token['exp'])
+ refresh_exp_datetime = (
auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in']))
assert user.access_token == oidc_profile['access_token']
- assert user.access_expiration == access_expiration
+ assert user.expires_at == exp_datetime
assert user.id_token == oidc_profile['id_token']
assert user.refresh_token == oidc_profile['refresh_token']
- assert user.refresh_expiration == refresh_expiration
+ assert user.refresh_expires_at == refresh_exp_datetime
assert user.scope == oidc_profile['scope']
assert user.session_state == oidc_profile['session_state']
@@ -93,25 +89,16 @@
kc_oidc_mock = mock_keycloak(mocker)
access_token = sample_data.oidc_profile['access_token']
+ decoded_token = kc_oidc_mock.decode_token(access_token)
request = api_request_factory.get(
url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
- # first authentication
user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user)
+ _check_authenticated_user(user, decoded_token)
# oidc_profile is not filled when authenticating through bearer token
assert hasattr(user, 'access_token') and user.access_token is None
- # second authentication, should fetch userinfo from cache
- # until token expires
- user, _ = drf_auth_backend.authenticate(request)
- _check_authenticated_user(user)
- assert hasattr(user, 'access_token') and user.access_token is None
-
- # check user request to keycloak has been sent only once
- kc_oidc_mock.userinfo.assert_called_once_with(access_token)
-
@pytest.mark.django_db
def test_drf_oidc_bearer_token_auth_backend_failure(mocker,
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226364
Attached To
D2876: auth/backends: Simplify and improve OIDC authentication
Event Timeline
Log In to Comment