Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345458
D5366.id19298.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
D5366.id19298.diff
View Options
diff --git a/requirements-django.txt b/requirements-django.txt
--- a/requirements-django.txt
+++ b/requirements-django.txt
@@ -1,3 +1,3 @@
django < 3
+djangorestframework
sentry-sdk
-
diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py
--- a/swh/auth/django/backends.py
+++ b/swh/auth/django/backends.py
@@ -3,20 +3,25 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from datetime import datetime
+import hashlib
from typing import Any, Dict, Optional
from django.core.cache import cache
from django.http import HttpRequest
from django.utils import timezone
+from rest_framework.authentication import BaseAuthentication
+from rest_framework.exceptions import AuthenticationFailed, ValidationError
import sentry_sdk
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import (
keycloak_oidc_client,
oidc_profile_cache_key,
+ oidc_user_from_decoded_token,
oidc_user_from_profile,
)
-from swh.auth.keycloak import KeycloakOpenIDConnect
+from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect
def _update_cached_oidc_profile(
@@ -110,3 +115,87 @@
return None
else:
return None
+
+
+class OIDCBearerTokenAuthentication(BaseAuthentication):
+ """
+ Django REST Framework authentication backend using bearer tokens for
+ Keycloak OpenID Connect.
+
+ It enables to authenticate a Web API user by sending a long-lived
+ OpenID Connect refresh token in HTTP Authorization headers.
+ Long lived refresh tokens can be generated by opening an OpenID Connect
+ session with the following scope: ``openid offline_access``.
+
+ To use that backend globally in your DRF application, proceed as follow:
+
+ * add ``"swh.auth.django.backends.OIDCBearerTokenAuthentication"``
+ to the ``REST_FRAMEWORK["DEFAULT_AUTHENTICATION_CLASSES"]``
+ django setting.
+
+ * configure Keycloak URL, realm and client by adding
+ ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID``
+ in django settings
+
+ Users will then be able to perform authenticated Web API calls by sending
+ their refresh token in HTTP Authorization headers, for instance:
+ ``curl -H "Authorization: Bearer ${TOKEN}" https://...``.
+
+ """
+
+ def authenticate(self, request):
+ auth_header = request.META.get("HTTP_AUTHORIZATION")
+ if auth_header is None:
+ return None
+
+ try:
+ auth_type, refresh_token = auth_header.split(" ", 1)
+ except ValueError:
+ raise AuthenticationFailed("Invalid HTTP authorization header format")
+
+ if auth_type != "Bearer":
+ raise AuthenticationFailed(
+ (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).")
+ )
+ try:
+
+ oidc_client = keycloak_oidc_client()
+
+ # compute a cache key from the token that does not exceed
+ # memcached key size limit
+ hasher = hashlib.sha1()
+ hasher.update(refresh_token.encode("ascii"))
+ cache_key = f"api_token_{hasher.hexdigest()}"
+
+ # check if an access token is cached
+ access_token = cache.get(cache_key)
+
+ if access_token is not None:
+ # attempt to decode access token
+ try:
+ decoded_token = oidc_client.decode_token(access_token)
+ # access token has expired
+ except ExpiredSignatureError:
+ decoded_token = None
+
+ if access_token is None or decoded_token is None:
+ # get a new access token from authentication provider
+ access_token = oidc_client.refresh_token(refresh_token)["access_token"]
+ # decode access token
+ decoded_token = oidc_client.decode_token(access_token)
+ # compute access token expiration
+ exp = datetime.fromtimestamp(decoded_token["exp"])
+ ttl = int(exp.timestamp() - timezone.now().timestamp())
+ # save access token in cache while it is valid
+ cache.set(cache_key, access_token, timeout=max(0, ttl))
+
+ # create Django user
+ user = oidc_user_from_decoded_token(decoded_token, oidc_client.client_id)
+ except UnicodeEncodeError as e:
+ sentry_sdk.capture_exception(e)
+ raise ValidationError("Invalid bearer token")
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ raise AuthenticationFailed(str(e))
+
+ return user, None
diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py
--- a/swh/auth/tests/conftest.py
+++ b/swh/auth/tests/conftest.py
@@ -4,9 +4,22 @@
# See top-level LICENSE file for more information
import pytest
+from rest_framework.test import APIClient, APIRequestFactory
# Alias rf fixture from pytest-django
@pytest.fixture
def request_factory(rf):
return rf
+
+
+# Fixture to get test client from Django REST Framework
+@pytest.fixture(scope="module")
+def api_client():
+ return APIClient()
+
+
+# Fixture to get API request factory from Django REST Framework
+@pytest.fixture(scope="module")
+def api_request_factory():
+ return APIRequestFactory()
diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py
--- a/swh/auth/tests/django/app/apptest/settings.py
+++ b/swh/auth/tests/django/app/apptest/settings.py
@@ -39,3 +39,11 @@
SWH_AUTH_SERVER_URL = SERVER_URL
SWH_AUTH_REALM_NAME = REALM_NAME
SWH_AUTH_CLIENT_ID = CLIENT_ID
+
+REST_FRAMEWORK = {
+ "DEFAULT_RENDERER_CLASSES": ("rest_framework.renderers.JSONRenderer",),
+ "DEFAULT_AUTHENTICATION_CLASSES": [
+ "rest_framework.authentication.SessionAuthentication",
+ "swh.auth.django.backends.OIDCBearerTokenAuthentication",
+ ],
+}
diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py
--- a/swh/auth/tests/django/app/apptest/urls.py
+++ b/swh/auth/tests/django/app/apptest/urls.py
@@ -5,6 +5,8 @@
from django.conf.urls import url
from django.http import HttpResponse
+from rest_framework.decorators import api_view
+from rest_framework.response import Response
from swh.auth.django.views import urlpatterns as auth_urlpatterns
@@ -13,4 +15,12 @@
return HttpResponse("Hello World !")
-urlpatterns = [url(r"^$", _root_view, name="root")] + auth_urlpatterns
+@api_view()
+def _api_view_test(request):
+ return Response({"message": "Hello World !"})
+
+
+urlpatterns = [
+ url(r"^$", _root_view, name="root"),
+ url(r"^api/test$", _api_view_test, name="api-test"),
+] + auth_urlpatterns
diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py
--- a/swh/auth/tests/django/test_backends.py
+++ b/swh/auth/tests/django/test_backends.py
@@ -9,7 +9,9 @@
from django.conf import settings
from django.contrib.auth import authenticate, get_backends
import pytest
+from rest_framework.exceptions import AuthenticationFailed
+from swh.auth.django.backends import OIDCBearerTokenAuthentication
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import reverse
from swh.auth.keycloak import ExpiredSignatureError
@@ -149,3 +151,106 @@
assert user.get_group_permissions() == {permission}
assert user.has_module_perms("webapp")
assert not user.has_module_perms("foo")
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory):
+ """
+ Checks successful login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login).
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+ access_token = oidc_profile["access_token"]
+
+ decoded_token = keycloak_oidc.decode_token(access_token)
+
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user, decoded_token, keycloak_oidc)
+ # oidc_profile is not filled when authenticating through bearer token
+ assert hasattr(user, "access_token") and user.access_token is None
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory):
+ """
+ Checks failed login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login).
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+
+ # simulate a failed authentication with a bearer token in expected format
+ keycloak_oidc.set_auth_success(False)
+
+ refresh_token = oidc_profile["refresh_token"]
+
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # simulate a failed authentication with an invalid bearer token format
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
+ )
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+
+def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory):
+ """
+ Checks failed login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login) due to invalid
+ authorization header value.
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # Invalid authorization type
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # Missing authorization type
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_permissions(
+ keycloak_oidc, api_request_factory
+):
+ """
+ Checks that a permission defined with OpenID Connect is correctly mapped
+ to a Django one when using bearer token authentication.
+ """
+ permission = "webapp.some-permission"
+ keycloak_oidc.user_permissions = [permission]
+
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+ url = reverse("api-test")
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+ user, _ = drf_auth_backend.authenticate(request)
+
+ assert user.has_perm(permission)
+ assert user.get_all_permissions() == {permission}
+ assert user.get_group_permissions() == {permission}
+ assert user.has_module_perms("webapp")
+ assert not user.has_module_perms("foo")
diff --git a/swh/auth/tests/django/test_drf_bearer_token_auth.py b/swh/auth/tests/django/test_drf_bearer_token_auth.py
new file mode 100644
--- /dev/null
+++ b/swh/auth/tests/django/test_drf_bearer_token_auth.py
@@ -0,0 +1,109 @@
+# Copyright (C) 2020-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.contrib.auth.models import AnonymousUser, User
+import pytest
+
+from swh.auth.django.models import OIDCUser
+from swh.auth.django.utils import reverse
+
+
+@pytest.mark.django_db
+def test_drf_django_session_auth_success(keycloak_oidc, client):
+ """
+ Check user gets authenticated when querying the web api
+ through a web browser.
+ """
+ url = reverse("api-test")
+
+ client.login(code="", code_verifier="", redirect_uri="")
+
+ response = client.get(url)
+ assert response.status_code == 200
+ request = response.wsgi_request
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_success(keycloak_oidc, api_client):
+ """
+ Check user gets authenticated when querying the web api
+ through an HTTP client using bearer token authentication.
+ """
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 200
+ request = response.wsgi_request
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_failure(keycloak_oidc, api_client):
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # check for failed authentication but with expected token format
+ keycloak_oidc.set_auth_success(False)
+ api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ # check for failed authentication when token format is invalid
+ api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà")
+
+ response = api_client.get(url)
+ assert response.status_code == 400
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+
+def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_oidc, api_client):
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # missing authorization type
+ api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ # invalid authorization type
+ api_client.credentials(HTTP_AUTHORIZATION="Foo token")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218186
Attached To
D5366: django: Add OIDC Bearer Token authentication backend for DRF views
Event Timeline
Log In to Comment