Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9696390
D2747.id10083.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D2747.id10083.diff
View Options
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
--- a/swh/web/auth/backends.py
+++ b/swh/web/auth/backends.py
@@ -8,6 +8,11 @@
from django.core.cache import cache
from django.http import HttpRequest
+from django.utils import timezone
+
+from rest_framework.authentication import BaseAuthentication
+from rest_framework.exceptions import AuthenticationFailed
+
import sentry_sdk
from swh.web.auth.keycloak import KeycloakOpenIDConnect
@@ -116,3 +121,44 @@
return None
else:
return None
+
+
+class OIDCBearerTokenAuthentication(BaseAuthentication):
+ def authenticate(self, request):
+ auth_header = request.META.get('HTTP_AUTHORIZATION')
+ if auth_header is None:
+ return None
+
+ try:
+ auth_type, token = auth_header.split(' ', 1)
+ except ValueError:
+ raise AuthenticationFailed(
+ 'Invalid HTTP authorization header format')
+
+ if auth_type != 'Bearer':
+ raise AuthenticationFailed(
+ (f'Invalid or unsupported HTTP authorization'
+ f' type ({auth_type}).'))
+
+ try:
+ # attempt to decode token
+ decoded = _oidc_client.decode_token(token)
+ userinfo = cache.get(decoded['sub'])
+ if userinfo:
+ user = _oidc_user_from_info(userinfo)
+ else:
+ # get OIDC userinfo
+ userinfo = _oidc_client.userinfo(token)
+ # create Django user
+ user = _oidc_user_from_info(userinfo)
+ # cache userinfo until token expires
+ max_ttl = decoded['exp'] - decoded['auth_time'] - 1
+ ttl = decoded['exp'] - int(timezone.now().timestamp()) - 1
+ ttl = max(0, min(ttl, max_ttl))
+ cache.set(decoded['sub'], userinfo, timeout=ttl)
+
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ raise AuthenticationFailed(str(e))
+
+ return user, None
diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py
--- a/swh/web/misc/urls.py
+++ b/swh/web/misc/urls.py
@@ -56,7 +56,7 @@
# when running end to end tests trough cypress, declare some extra
# endpoints to provide input data for some of those tests
if get_config()['e2e_tests_mode']:
- from swh.web.tests.data import (
+ from swh.web.tests.views import (
get_content_code_data_by_ext,
get_content_other_data_by_ext,
get_content_code_data_all_exts,
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -170,7 +170,11 @@
'DEFAULT_THROTTLE_CLASSES': (
'swh.web.common.throttling.SwhWebRateThrottle',
),
- 'DEFAULT_THROTTLE_RATES': throttle_rates
+ 'DEFAULT_THROTTLE_RATES': throttle_rates,
+ 'DEFAULT_AUTHENTICATION_CLASSES': [
+ 'rest_framework.authentication.SessionAuthentication',
+ 'swh.web.auth.backends.OIDCBearerTokenAuthentication',
+ ],
}
LOGGING = {
diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_api_auth.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from django.contrib.auth.models import AnonymousUser, User
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+
+from .keycloak_mock import mock_keycloak
+from . import sample_data
+
+
+@pytest.mark.django_db
+def test_drf_django_session_auth_success(mocker, client):
+ """
+ Check user gets authenticated when querying the web api
+ through a web browser.
+ """
+ url = reverse('api-1-stat-counters')
+
+ mock_keycloak(mocker)
+ client.login(code='', code_verifier='', redirect_uri='')
+
+ response = client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 200
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_success(mocker, api_client):
+ """
+ Check user gets authenticated when querying the web api
+ through an HTTP client using bearer token authentication.
+ """
+ url = reverse('api-1-stat-counters')
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ mock_keycloak(mocker)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer {access_token}")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 200
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_failure(mocker, api_client):
+ url = reverse('api-1-stat-counters')
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ # check for failed authentication but with expected token format
+ mock_keycloak(mocker, auth_success=False)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer {access_token}")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
+
+ # check for failed authentication when token format is invalid
+ mock_keycloak(mocker)
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Bearer invalid-token-format")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
+
+
+def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client):
+ url = reverse('api-1-stat-counters')
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ # missing authorization type
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"{access_token}")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
+
+ # invalid authorization type
+ api_client.credentials(
+ HTTP_AUTHORIZATION=f"Foo token")
+
+ response = api_client.get(url)
+ request = response.wsgi_request
+
+ assert response.status_code == 403
+ assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
--- a/swh/web/tests/auth/test_backends.py
+++ b/swh/web/tests/auth/test_backends.py
@@ -11,6 +11,9 @@
from django.conf import settings
+from rest_framework.exceptions import AuthenticationFailed
+
+from swh.web.auth.backends import OIDCBearerTokenAuthentication
from swh.web.auth.models import OIDCUser
from swh.web.common.utils import reverse
@@ -79,3 +82,82 @@
user = _authenticate_user(request_factory)
assert user is None
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_success(mocker,
+ api_request_factory):
+ url = reverse('api-1-stat-counters')
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
+
+ # first authentication
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user)
+ # oidc_profile is not filled when authenticating through bearer token
+ assert hasattr(user, 'access_token') and user.access_token is None
+
+ # second authentication, should fetch userinfo from cache
+ # until token expires
+ user, _ = drf_auth_backend.authenticate(request)
+ _check_authenticated_user(user)
+ assert hasattr(user, 'access_token') and user.access_token is None
+
+ # check user request to keycloak has been sent only once
+ kc_oidc_mock.userinfo.assert_called_once_with(access_token)
+
+
+@pytest.mark.django_db
+def test_drf_oidc_bearer_token_auth_backend_failure(mocker,
+ api_request_factory):
+
+ url = reverse('api-1-stat-counters')
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ # simulate a failed authentication with a bearer token in expected format
+ mock_keycloak(mocker, auth_success=False)
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer {access_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # simulate a failed authentication with an invalid bearer token format
+ mock_keycloak(mocker)
+
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Bearer invalid-token-format")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+
+def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory):
+
+ url = reverse('api-1-stat-counters')
+ drf_auth_backend = OIDCBearerTokenAuthentication()
+
+ access_token = sample_data.oidc_profile['access_token']
+
+ # Invalid authorization type
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"Foo token")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
+
+ # Missing authorization type
+ request = api_request_factory.get(
+ url, HTTP_AUTHORIZATION=f"{access_token}")
+
+ with pytest.raises(AuthenticationFailed):
+ drf_auth_backend.authenticate(request)
diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py
--- a/swh/web/tests/data.py
+++ b/swh/web/tests/data.py
@@ -7,18 +7,13 @@
import random
from copy import deepcopy
-from typing import Dict
-
-from rest_framework.decorators import api_view
-from rest_framework.response import Response
from swh.indexer.fossology_license import FossologyLicenseIndexer
from swh.indexer.mimetype import MimetypeIndexer
from swh.indexer.ctags import CtagsIndexer
from swh.indexer.storage import get_indexer_storage
-from swh.model import from_disk
from swh.model.hashutil import hash_to_hex, DEFAULT_ALGORITHMS
-from swh.model.model import Content, Directory, Origin
+from swh.model.model import Directory, Origin
from swh.loader.git.from_disk import GitLoaderFromArchive
from swh.search import get_search
from swh.storage.algos.dir_iterators import dir_iterator
@@ -28,7 +23,6 @@
_re_encode_content
)
from swh.web.common import service
-from swh.web.common.highlightjs import get_hljs_language_from_filename
# Module used to initialize data that will be provided as tests input
@@ -338,147 +332,3 @@
service.storage = storage
service.idx_storage = idx_storage
service.search = search
-
-
-# Implement some special endpoints used to provide input tests data
-# when executing end to end tests with cypress
-
-_content_code_data_exts = {} # type: Dict[str, Dict[str, str]]
-_content_code_data_filenames = {} # type: Dict[str, Dict[str, str]]
-_content_other_data_exts = {} # type: Dict[str, Dict[str, str]]
-
-
-def _init_content_tests_data(data_path, data_dict, ext_key):
- """
- Helper function to read the content of a directory, store it
- into a test archive and add some files metadata (sha1 and/or
- expected programming language) in a dict.
-
- Args:
- data_path (str): path to a directory relative to the tests
- folder of swh-web
- data_dict (dict): the dict that will store files metadata
- ext_key (bool): whether to use file extensions or filenames
- as dict keys
- """
- test_contents_dir = os.path.join(
- os.path.dirname(__file__), data_path).encode('utf-8')
- directory = from_disk.Directory.from_disk(path=test_contents_dir)
-
- contents = []
- for name, obj in directory.items():
- if isinstance(obj, from_disk.Content):
- c = obj.to_model().with_data().to_dict()
- c['status'] = 'visible'
- sha1 = hash_to_hex(c['sha1'])
- if ext_key:
- key = name.decode('utf-8').split('.')[-1]
- filename = 'test.' + key
- else:
- filename = name.decode('utf-8').split('/')[-1]
- key = filename
- language = get_hljs_language_from_filename(filename)
- data_dict[key] = {'sha1': sha1,
- 'language': language}
- contents.append(Content.from_dict(c))
- storage = get_tests_data()['storage']
- storage.content_add(contents)
-
-
-def _init_content_code_data_exts():
- """
- Fill a global dictionary which maps source file extension to
- a code content example.
- """
- global _content_code_data_exts
- _init_content_tests_data('resources/contents/code/extensions',
- _content_code_data_exts, True)
-
-
-def _init_content_other_data_exts():
- """
- Fill a global dictionary which maps a file extension to
- a content example.
- """
- global _content_other_data_exts
- _init_content_tests_data('resources/contents/other/extensions',
- _content_other_data_exts, True)
-
-
-def _init_content_code_data_filenames():
- """
- Fill a global dictionary which maps a filename to
- a content example.
- """
- global _content_code_data_filenames
- _init_content_tests_data('resources/contents/code/filenames',
- _content_code_data_filenames, False)
-
-
-if config.get_config()['e2e_tests_mode']:
- _init_content_code_data_exts()
- _init_content_other_data_exts()
- _init_content_code_data_filenames()
-
-
-@api_view(['GET'])
-def get_content_code_data_all_exts(request):
- """
- Endpoint implementation returning a list of all source file
- extensions to test for highlighting using cypress.
- """
- return Response(sorted(_content_code_data_exts.keys()),
- status=200, content_type='application/json')
-
-
-@api_view(['GET'])
-def get_content_code_data_by_ext(request, ext):
- """
- Endpoint implementation returning metadata of a code content example
- based on the source file extension.
- """
- data = None
- status = 404
- if ext in _content_code_data_exts:
- data = _content_code_data_exts[ext]
- status = 200
- return Response(data, status=status, content_type='application/json')
-
-
-@api_view(['GET'])
-def get_content_other_data_by_ext(request, ext):
- """
- Endpoint implementation returning metadata of a content example
- based on the file extension.
- """
- _init_content_other_data_exts()
- data = None
- status = 404
- if ext in _content_other_data_exts:
- data = _content_other_data_exts[ext]
- status = 200
- return Response(data, status=status, content_type='application/json')
-
-
-@api_view(['GET'])
-def get_content_code_data_all_filenames(request):
- """
- Endpoint implementation returning a list of all source filenames
- to test for highlighting using cypress.
- """
- return Response(sorted(_content_code_data_filenames.keys()),
- status=200, content_type='application/json')
-
-
-@api_view(['GET'])
-def get_content_code_data_by_filename(request, filename):
- """
- Endpoint implementation returning metadata of a code content example
- based on the source filename.
- """
- data = None
- status = 404
- if filename in _content_code_data_filenames:
- data = _content_code_data_filenames[filename]
- status = 200
- return Response(data, status=status, content_type='application/json')
diff --git a/swh/web/tests/views.py b/swh/web/tests/views.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/views.py
@@ -0,0 +1,161 @@
+# Copyright (C) 2018-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+# Implement some special endpoints used to provide input tests data
+# when executing end to end tests with cypress
+
+import os
+
+from typing import Dict
+
+from rest_framework.decorators import api_view
+from rest_framework.response import Response
+
+from swh.model import from_disk
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import Content
+from swh.web.common.highlightjs import get_hljs_language_from_filename
+from swh.web.config import get_config
+from swh.web.tests.data import get_tests_data
+
+_content_code_data_exts = {} # type: Dict[str, Dict[str, str]]
+_content_code_data_filenames = {} # type: Dict[str, Dict[str, str]]
+_content_other_data_exts = {} # type: Dict[str, Dict[str, str]]
+
+
+def _init_content_tests_data(data_path, data_dict, ext_key):
+ """
+ Helper function to read the content of a directory, store it
+ into a test archive and add some files metadata (sha1 and/or
+ expected programming language) in a dict.
+
+ Args:
+ data_path (str): path to a directory relative to the tests
+ folder of swh-web
+ data_dict (dict): the dict that will store files metadata
+ ext_key (bool): whether to use file extensions or filenames
+ as dict keys
+ """
+ test_contents_dir = os.path.join(
+ os.path.dirname(__file__), data_path).encode('utf-8')
+ directory = from_disk.Directory.from_disk(path=test_contents_dir)
+
+ contents = []
+ for name, obj in directory.items():
+ if isinstance(obj, from_disk.Content):
+ c = obj.to_model().with_data().to_dict()
+ c['status'] = 'visible'
+ sha1 = hash_to_hex(c['sha1'])
+ if ext_key:
+ key = name.decode('utf-8').split('.')[-1]
+ filename = 'test.' + key
+ else:
+ filename = name.decode('utf-8').split('/')[-1]
+ key = filename
+ language = get_hljs_language_from_filename(filename)
+ data_dict[key] = {'sha1': sha1,
+ 'language': language}
+ contents.append(Content.from_dict(c))
+ storage = get_tests_data()['storage']
+ storage.content_add(contents)
+
+
+def _init_content_code_data_exts():
+ """
+ Fill a global dictionary which maps source file extension to
+ a code content example.
+ """
+ global _content_code_data_exts
+ _init_content_tests_data('resources/contents/code/extensions',
+ _content_code_data_exts, True)
+
+
+def _init_content_other_data_exts():
+ """
+ Fill a global dictionary which maps a file extension to
+ a content example.
+ """
+ global _content_other_data_exts
+ _init_content_tests_data('resources/contents/other/extensions',
+ _content_other_data_exts, True)
+
+
+def _init_content_code_data_filenames():
+ """
+ Fill a global dictionary which maps a filename to
+ a content example.
+ """
+ global _content_code_data_filenames
+ _init_content_tests_data('resources/contents/code/filenames',
+ _content_code_data_filenames, False)
+
+
+if get_config()['e2e_tests_mode']:
+ _init_content_code_data_exts()
+ _init_content_other_data_exts()
+ _init_content_code_data_filenames()
+
+
+@api_view(['GET'])
+def get_content_code_data_all_exts(request):
+ """
+ Endpoint implementation returning a list of all source file
+ extensions to test for highlighting using cypress.
+ """
+ return Response(sorted(_content_code_data_exts.keys()),
+ status=200, content_type='application/json')
+
+
+@api_view(['GET'])
+def get_content_code_data_by_ext(request, ext):
+ """
+ Endpoint implementation returning metadata of a code content example
+ based on the source file extension.
+ """
+ data = None
+ status = 404
+ if ext in _content_code_data_exts:
+ data = _content_code_data_exts[ext]
+ status = 200
+ return Response(data, status=status, content_type='application/json')
+
+
+@api_view(['GET'])
+def get_content_other_data_by_ext(request, ext):
+ """
+ Endpoint implementation returning metadata of a content example
+ based on the file extension.
+ """
+ _init_content_other_data_exts()
+ data = None
+ status = 404
+ if ext in _content_other_data_exts:
+ data = _content_other_data_exts[ext]
+ status = 200
+ return Response(data, status=status, content_type='application/json')
+
+
+@api_view(['GET'])
+def get_content_code_data_all_filenames(request):
+ """
+ Endpoint implementation returning a list of all source filenames
+ to test for highlighting using cypress.
+ """
+ return Response(sorted(_content_code_data_filenames.keys()),
+ status=200, content_type='application/json')
+
+
+@api_view(['GET'])
+def get_content_code_data_by_filename(request, filename):
+ """
+ Endpoint implementation returning metadata of a code content example
+ based on the source filename.
+ """
+ data = None
+ status = 404
+ if filename in _content_code_data_filenames:
+ data = _content_code_data_filenames[filename]
+ status = 200
+ return Response(data, status=status, content_type='application/json')
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 17, 7:59 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217761
Attached To
D2747: Add DRF bearer token authentication using OpenID Connect
Event Timeline
Log In to Comment