Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696395
D2747.id9814.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D2747.id9814.diff
View Options
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -7,6 +7,14 @@
from typing import Dict, Optional
from django.http import HttpRequest
+from django.conf import settings
+from django.core.cache import cache
+from django.utils import timezone
+
+from rest_framework.authentication import BaseAuthentication
+from rest_framework.exceptions import AuthenticationFailed
+
+import sentry_sdk
from swh.web.auth.utils import get_oidc_client
from swh.web.auth.models import OIDCUser
@@ -103,3 +111,34 @@
code_verifier: Code verifier used for authentication
"""
return _auth_exception[code_verifier]
+
+
+class OIDCBearerTokenAuthentication(BaseAuthentication):
+ def authenticate(self, request):
+ auth_header = request.META.get('HTTP_AUTHORIZATION')
+ if auth_header is None:
+ return None
+
+ auth_header = request.META.get('HTTP_AUTHORIZATION').split()
+ token = auth_header[1] if len(auth_header) == 2 else auth_header[0]
+
+ try:
+ # get OpenID Connect client to communicate with Keycloak server
+ oidc_client = get_oidc_client(settings.OIDC_SWH_WEB_CLIENT_ID)
+ # attempt to decode token
+ decoded = oidc_client.decode_token(token)
+ userinfo = cache.get(decoded['sub'])
+ if userinfo:
+ user = _oidc_user_from_info(userinfo)
+ else:
+ # create Django user
+ user = _create_oidc_user(oidc_client, token)
+ # cache userinfo until token expires
+ ttl = decoded['exp'] - int(timezone.now().timestamp())
+ cache.set(decoded['sub'], user.userinfo, timeout=ttl)
+
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ raise AuthenticationFailed(str(e))
+
+ return user, None
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -168,7 +168,11 @@
'DEFAULT_THROTTLE_CLASSES': (
'swh.web.common.throttling.SwhWebRateThrottle',
),
- 'DEFAULT_THROTTLE_RATES': throttle_rates
+ 'DEFAULT_THROTTLE_RATES': throttle_rates,
+ 'DEFAULT_AUTHENTICATION_CLASSES': [
+ 'rest_framework.authentication.SessionAuthentication',
+ 'swh.web.auth.backends.OIDCBearerTokenAuthentication',
+ ],
}
LOGGING = {
diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_api_auth.py
@@ -0,0 +1,90 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from django.contrib.auth.models import AnonymousUser, User
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+
+from .keycloak_mock import mock_keycloak
+from .sample_data import oidc_profile
+
+
+@pytest.mark.django_db
+def test_drf_django_session_auth_backend_success(mocker, client):
+ """
+ Check user gets authenticated when querying the web api
+ through a web browser.
+ """
+ url = reverse('api-1-stat-counters')
+
+ mock_keycloak(mocker)
+ client.login(code='', code_verifier='', redirect_uri='')
+
+ response = client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 200
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_success(mocker, api_client):
+ """
+ Check user gets authenticated when querying the web api
+ through an HTTP client using bearer token authentication.
+ """
+ url = reverse('api-1-stat-counters')
+
+ mock_keycloak(mocker)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer {oidc_profile['access_token']}")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 200
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_failure(mocker, api_client):
+ url = reverse('api-1-stat-counters')
+
+ # check for failed authentication but with expected token format
+ mock_keycloak(mocker, auth_success=False)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer {oidc_profile['access_token']}")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
+
+ # check for failed authentication when token format is invalid
+ mock_keycloak(mocker)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer invalid-token-format")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -12,6 +12,9 @@
from django.conf import settings
+from rest_framework.exceptions import AuthenticationFailed
+
+from swh.web.auth.backends import OIDCBearerTokenAuthentication
from swh.web.auth.models import OIDCUser
from swh.web.common.utils import reverse
@@ -77,3 +80,56 @@
user = _authenticate_user(request_factory)
assert user is None
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_success(mocker,
+ api_request_factory):
+ url = reverse('api-1-stat-counters')
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer {oidc_profile['access_token']}")
+
+ # first authentication
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user)
+ # oidc_profile is not filled when authenticating through bearer token
+ assert hasattr(user, 'oidc_profile') and not user.oidc_profile
+
+ # second authentication, should fetch userinfo from cache
+ # until token expires
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user)
+ assert hasattr(user, 'oidc_profile') and not user.oidc_profile
+
+ # check user request to keycloak has been sent only once
+ kc_oidc_mock.userinfo.assert_called_once_with(oidc_profile['access_token'])
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_failure(mocker,
+ api_request_factory):
+
+ url = reverse('api-1-stat-counters')
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ # simulate a failed authentication with a bearer token in expected format
+ mock_keycloak(mocker, auth_success=False)
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer {oidc_profile['access_token']}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # simulate a failed authentication with an invalid bearer token format
+ mock_keycloak(mocker)
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer invalid-token-format")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 7:59 PM (1 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214677
Attached To
D2747: Add DRF bearer token authentication using OpenID Connect
Event Timeline
Log In to Comment