Page MenuHomeSoftware Heritage

D5366.id19298.diff
No OneTemporary

D5366.id19298.diff

diff --git a/requirements-django.txt b/requirements-django.txt
--- a/requirements-django.txt
+++ b/requirements-django.txt
@@ -1,3 +1,3 @@
django < 3
+djangorestframework
sentry-sdk
-
diff --git a/swh/auth/django/backends.py b/swh/auth/django/backends.py
--- a/swh/auth/django/backends.py
+++ b/swh/auth/django/backends.py
@@ -3,20 +3,25 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from datetime import datetime
+import hashlib
from typing import Any, Dict, Optional
from django.core.cache import cache
from django.http import HttpRequest
from django.utils import timezone
+from rest_framework.authentication import BaseAuthentication
+from rest_framework.exceptions import AuthenticationFailed, ValidationError
import sentry_sdk
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import (
keycloak_oidc_client,
oidc_profile_cache_key,
+ oidc_user_from_decoded_token,
oidc_user_from_profile,
)
-from swh.auth.keycloak import KeycloakOpenIDConnect
+from swh.auth.keycloak import ExpiredSignatureError, KeycloakOpenIDConnect
def _update_cached_oidc_profile(
@@ -110,3 +115,87 @@
return None
else:
return None
+
+
+class OIDCBearerTokenAuthentication(BaseAuthentication):
+ """
+ Django REST Framework authentication backend using bearer tokens for
+ Keycloak OpenID Connect.
+
+ It enables to authenticate a Web API user by sending a long-lived
+ OpenID Connect refresh token in HTTP Authorization headers.
+ Long lived refresh tokens can be generated by opening an OpenID Connect
+ session with the following scope: ``openid offline_access``.
+
+ To use that backend globally in your DRF application, proceed as follow:
+
+ * add ``"swh.auth.django.backends.OIDCBearerTokenAuthentication"``
+ to the ``REST_FRAMEWORK["DEFAULT_AUTHENTICATION_CLASSES"]``
+ django setting.
+
+ * configure Keycloak URL, realm and client by adding
+ ``SWH_AUTH_SERVER_URL``, ``SWH_AUTH_REALM_NAME`` and ``SWH_AUTH_CLIENT_ID``
+ in django settings
+
+ Users will then be able to perform authenticated Web API calls by sending
+ their refresh token in HTTP Authorization headers, for instance:
+ ``curl -H "Authorization: Bearer ${TOKEN}" https://...``.
+
+ """
+
+ def authenticate(self, request):
+ auth_header = request.META.get("HTTP_AUTHORIZATION")
+ if auth_header is None:
+ return None
+
+ try:
+ auth_type, refresh_token = auth_header.split(" ", 1)
+ except ValueError:
+ raise AuthenticationFailed("Invalid HTTP authorization header format")
+
+ if auth_type != "Bearer":
+ raise AuthenticationFailed(
+ (f"Invalid or unsupported HTTP authorization" f" type ({auth_type}).")
+ )
+ try:
+
+ oidc_client = keycloak_oidc_client()
+
+ # compute a cache key from the token that does not exceed
+ # memcached key size limit
+ hasher = hashlib.sha1()
+ hasher.update(refresh_token.encode("ascii"))
+ cache_key = f"api_token_{hasher.hexdigest()}"
+
+ # check if an access token is cached
+ access_token = cache.get(cache_key)
+
+ if access_token is not None:
+ # attempt to decode access token
+ try:
+ decoded_token = oidc_client.decode_token(access_token)
+ # access token has expired
+ except ExpiredSignatureError:
+ decoded_token = None
+
+ if access_token is None or decoded_token is None:
+ # get a new access token from authentication provider
+ access_token = oidc_client.refresh_token(refresh_token)["access_token"]
+ # decode access token
+ decoded_token = oidc_client.decode_token(access_token)
+ # compute access token expiration
+ exp = datetime.fromtimestamp(decoded_token["exp"])
+ ttl = int(exp.timestamp() - timezone.now().timestamp())
+ # save access token in cache while it is valid
+ cache.set(cache_key, access_token, timeout=max(0, ttl))
+
+ # create Django user
+ user = oidc_user_from_decoded_token(decoded_token, oidc_client.client_id)
+ except UnicodeEncodeError as e:
+ sentry_sdk.capture_exception(e)
+ raise ValidationError("Invalid bearer token")
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ raise AuthenticationFailed(str(e))
+
+ return user, None
diff --git a/swh/auth/tests/conftest.py b/swh/auth/tests/conftest.py
--- a/swh/auth/tests/conftest.py
+++ b/swh/auth/tests/conftest.py
@@ -4,9 +4,22 @@
# See top-level LICENSE file for more information
import pytest
+from rest_framework.test import APIClient, APIRequestFactory
# Alias rf fixture from pytest-django
@pytest.fixture
def request_factory(rf):
return rf
+
+
+# Fixture to get test client from Django REST Framework
+@pytest.fixture(scope="module")
+def api_client():
+ return APIClient()
+
+
+# Fixture to get API request factory from Django REST Framework
+@pytest.fixture(scope="module")
+def api_request_factory():
+ return APIRequestFactory()
diff --git a/swh/auth/tests/django/app/apptest/settings.py b/swh/auth/tests/django/app/apptest/settings.py
--- a/swh/auth/tests/django/app/apptest/settings.py
+++ b/swh/auth/tests/django/app/apptest/settings.py
@@ -39,3 +39,11 @@
SWH_AUTH_SERVER_URL = SERVER_URL
SWH_AUTH_REALM_NAME = REALM_NAME
SWH_AUTH_CLIENT_ID = CLIENT_ID
+
+REST_FRAMEWORK = {
+ "DEFAULT_RENDERER_CLASSES": ("rest_framework.renderers.JSONRenderer",),
+ "DEFAULT_AUTHENTICATION_CLASSES": [
+ "rest_framework.authentication.SessionAuthentication",
+ "swh.auth.django.backends.OIDCBearerTokenAuthentication",
+ ],
+}
diff --git a/swh/auth/tests/django/app/apptest/urls.py b/swh/auth/tests/django/app/apptest/urls.py
--- a/swh/auth/tests/django/app/apptest/urls.py
+++ b/swh/auth/tests/django/app/apptest/urls.py
@@ -5,6 +5,8 @@
from django.conf.urls import url
from django.http import HttpResponse
+from rest_framework.decorators import api_view
+from rest_framework.response import Response
from swh.auth.django.views import urlpatterns as auth_urlpatterns
@@ -13,4 +15,12 @@
return HttpResponse("Hello World !")
-urlpatterns = [url(r"^$", _root_view, name="root")] + auth_urlpatterns
+@api_view()
+def _api_view_test(request):
+ return Response({"message": "Hello World !"})
+
+
+urlpatterns = [
+ url(r"^$", _root_view, name="root"),
+ url(r"^api/test$", _api_view_test, name="api-test"),
+] + auth_urlpatterns
diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py
--- a/swh/auth/tests/django/test_backends.py
+++ b/swh/auth/tests/django/test_backends.py
@@ -9,7 +9,9 @@
from django.conf import settings
from django.contrib.auth import authenticate, get_backends
import pytest
+from rest_framework.exceptions import AuthenticationFailed
+from swh.auth.django.backends import OIDCBearerTokenAuthentication
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import reverse
from swh.auth.keycloak import ExpiredSignatureError
@@ -149,3 +151,106 @@
assert user.get_group_permissions() == {permission}
assert user.has_module_perms("webapp")
assert not user.has_module_perms("foo")
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_success(keycloak_oidc, api_request_factory):
+ """
+ Checks successful login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login).
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+ access_token = oidc_profile["access_token"]
+
+ decoded_token = keycloak_oidc.decode_token(access_token)
+
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user, decoded_token, keycloak_oidc)
+ # oidc_profile is not filled when authenticating through bearer token
+ assert hasattr(user, "access_token") and user.access_token is None
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_failure(keycloak_oidc, api_request_factory):
+ """
+ Checks failed login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login).
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+
+ # simulate a failed authentication with a bearer token in expected format
+ keycloak_oidc.set_auth_success(False)
+
+ refresh_token = oidc_profile["refresh_token"]
+
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # simulate a failed authentication with an invalid bearer token format
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION="Bearer invalid-token-format"
+ )
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+
+def test_drf_oidc_auth_invalid_or_missing_auth_type(keycloak_oidc, api_request_factory):
+ """
+ Checks failed login based on OpenID Connect bearer token Django REST
+ Framework authentication backend (Web API login) due to invalid
+ authorization header value.
+ """
+ url = reverse("api-test")
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # Invalid authorization type
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION="Foo token")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # Missing authorization type
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"{refresh_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_permissions(
+ keycloak_oidc, api_request_factory
+):
+ """
+ Checks that a permission defined with OpenID Connect is correctly mapped
+ to a Django one when using bearer token authentication.
+ """
+ permission = "webapp.some-permission"
+ keycloak_oidc.user_permissions = [permission]
+
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+ url = reverse("api-test")
+ request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+ user, _ = drf_auth_backend.authenticate(request)
+
+ assert user.has_perm(permission)
+ assert user.get_all_permissions() == {permission}
+ assert user.get_group_permissions() == {permission}
+ assert user.has_module_perms("webapp")
+ assert not user.has_module_perms("foo")
diff --git a/swh/auth/tests/django/test_drf_bearer_token_auth.py b/swh/auth/tests/django/test_drf_bearer_token_auth.py
new file mode 100644
--- /dev/null
+++ b/swh/auth/tests/django/test_drf_bearer_token_auth.py
@@ -0,0 +1,109 @@
+# Copyright (C) 2020-2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from django.contrib.auth.models import AnonymousUser, User
+import pytest
+
+from swh.auth.django.models import OIDCUser
+from swh.auth.django.utils import reverse
+
+
+@pytest.mark.django_db
+def test_drf_django_session_auth_success(keycloak_oidc, client):
+ """
+ Check user gets authenticated when querying the web api
+ through a web browser.
+ """
+ url = reverse("api-test")
+
+ client.login(code="", code_verifier="", redirect_uri="")
+
+ response = client.get(url)
+ assert response.status_code == 200
+ request = response.wsgi_request
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_success(keycloak_oidc, api_client):
+ """
+ Check user gets authenticated when querying the web api
+ through an HTTP client using bearer token authentication.
+ """
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 200
+ request = response.wsgi_request
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_failure(keycloak_oidc, api_client):
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # check for failed authentication but with expected token format
+ keycloak_oidc.set_auth_success(False)
+ api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ # check for failed authentication when token format is invalid
+ api_client.credentials(HTTP_AUTHORIZATION="Bearer invalid-token-format-ééàà")
+
+ response = api_client.get(url)
+ assert response.status_code == 400
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+
+def test_drf_oidc_auth_invalid_or_missing_authorization_type(keycloak_oidc, api_client):
+ url = reverse("api-test")
+
+ oidc_profile = keycloak_oidc.login()
+ refresh_token = oidc_profile["refresh_token"]
+
+ # missing authorization type
+ api_client.credentials(HTTP_AUTHORIZATION=f"{refresh_token}")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)
+
+ # invalid authorization type
+ api_client.credentials(HTTP_AUTHORIZATION="Foo token")
+
+ response = api_client.get(url)
+ assert response.status_code == 403
+ request = response.wsgi_request
+
+ assert isinstance(request.user, AnonymousUser)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:21 PM (5 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218186

Event Timeline