diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py index 09758814..b6b8add0 100644 --- a/swh/web/auth/backends.py +++ b/swh/web/auth/backends.py @@ -1,118 +1,164 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from typing import Any, Dict, Optional, Tuple from django.core.cache import cache from django.http import HttpRequest +from django.utils import timezone + +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed + import sentry_sdk from swh.web.auth.keycloak import KeycloakOpenIDConnect from swh.web.auth.utils import get_oidc_client from swh.web.auth.models import OIDCUser # OpenID Connect client to communicate with Keycloak server _oidc_client: KeycloakOpenIDConnect = get_oidc_client() def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser: # compute an integer user identifier for Django User model # by concatenating all groups of the UUID4 user identifier # generated by Keycloak and converting it from hex to decimal user_id = int(''.join(userinfo['sub'].split('-')), 16) # create a Django user that will not be saved to database user = OIDCUser(id=user_id, username=userinfo['preferred_username'], password='', first_name=userinfo['given_name'], last_name=userinfo['family_name'], email=userinfo['email']) # set is_staff user property based on groups user.is_staff = '/staff' in userinfo['groups'] # add userinfo sub to custom User proxy model user.sub = userinfo['sub'] return user def _oidc_user_from_profile(oidc_profile: Dict[str, Any], userinfo: Optional[Dict[str, Any]] = None ) -> Tuple[OIDCUser, Dict[str, Any]]: # get access token access_token = oidc_profile['access_token'] # request OIDC userinfo if userinfo is None: userinfo = _oidc_client.userinfo(access_token) # create OIDCUser from userinfo user = _oidc_user_from_info(userinfo) # decode JWT token decoded_token = _oidc_client.decode_token(access_token) # get authentication init datetime auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) # compute OIDC tokens expiration date oidc_profile['access_expiration'] = ( auth_datetime + timedelta(seconds=oidc_profile['expires_in'])) oidc_profile['refresh_expiration'] = ( auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in'])) # add OIDC profile data to custom User proxy model for key, val in oidc_profile.items(): if hasattr(user, key): setattr(user, key, val) return user, userinfo class OIDCAuthorizationCodePKCEBackend: def authenticate(self, request: HttpRequest, code: str, code_verifier: str, redirect_uri: str) -> Optional[OIDCUser]: user = None try: # try to authenticate user with OIDC PKCE authorization code flow oidc_profile = _oidc_client.authorization_code( code, redirect_uri, code_verifier=code_verifier) # create Django user user, userinfo = _oidc_user_from_profile(oidc_profile) # save authenticated user data in cache cache.set(f'user_{user.id}', {'userinfo': userinfo, 'oidc_profile': oidc_profile}, timeout=oidc_profile['refresh_expires_in']) except Exception as e: sentry_sdk.capture_exception(e) return user def get_user(self, user_id: int) -> Optional[OIDCUser]: # get user data from cache user_oidc_data = cache.get(f'user_{user_id}') if user_oidc_data: try: user, _ = _oidc_user_from_profile( user_oidc_data['oidc_profile'], user_oidc_data['userinfo']) # restore auth backend setattr(user, 'backend', f'{__name__}.{self.__class__.__name__}') return user except Exception as e: sentry_sdk.capture_exception(e) return None else: return None + + +class OIDCBearerTokenAuthentication(BaseAuthentication): + def authenticate(self, request): + auth_header = request.META.get('HTTP_AUTHORIZATION') + if auth_header is None: + return None + + try: + auth_type, token = auth_header.split(' ', 1) + except ValueError: + raise AuthenticationFailed( + 'Invalid HTTP authorization header format') + + if auth_type != 'Bearer': + raise AuthenticationFailed( + (f'Invalid or unsupported HTTP authorization' + f' type ({auth_type}).')) + + try: + # attempt to decode token + decoded = _oidc_client.decode_token(token) + userinfo = cache.get(decoded['sub']) + if userinfo: + user = _oidc_user_from_info(userinfo) + else: + # get OIDC userinfo + userinfo = _oidc_client.userinfo(token) + # create Django user + user = _oidc_user_from_info(userinfo) + # cache userinfo until token expires + max_ttl = decoded['exp'] - decoded['auth_time'] - 1 + ttl = decoded['exp'] - int(timezone.now().timestamp()) - 1 + ttl = max(0, min(ttl, max_ttl)) + cache.set(decoded['sub'], userinfo, timeout=ttl) + + except Exception as e: + sentry_sdk.capture_exception(e) + raise AuthenticationFailed(str(e)) + + return user, None diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py index 82deb6d6..413423d8 100644 --- a/swh/web/misc/urls.py +++ b/swh/web/misc/urls.py @@ -1,83 +1,83 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import requests import sentry_sdk from django.conf.urls import url, include from django.contrib.staticfiles import finders from django.http import HttpResponse from django.shortcuts import render from swh.web.common import service from swh.web.config import get_config from swh.web.misc.metrics import prometheus_metrics def _jslicenses(request): jslicenses_file = finders.find('jssources/jslicenses.json') jslicenses_data = json.load(open(jslicenses_file)) jslicenses_data = sorted(jslicenses_data.items(), key=lambda item: item[0].split('/')[-1]) return render(request, "misc/jslicenses.html", {'jslicenses_data': jslicenses_data}) def _stat_counters(request): stat = service.stat_counters() url = get_config()['history_counters_url'] stat_counters_history = 'null' if url: try: response = requests.get(url, timeout=5) stat_counters_history = response.text except Exception as exc: sentry_sdk.capture_exception(exc) json_data = '{"stat_counters": %s, "stat_counters_history": %s}' % ( json.dumps(stat), stat_counters_history) return HttpResponse(json_data, content_type='application/json') urlpatterns = [ url(r'^', include('swh.web.misc.coverage')), url(r'^jslicenses/$', _jslicenses, name='jslicenses'), url(r'^', include('swh.web.misc.origin_save')), url(r'^stat_counters/', _stat_counters, name='stat-counters'), url(r'^', include('swh.web.misc.badges')), url(r'^metrics/prometheus/$', prometheus_metrics, name='metrics-prometheus'), ] # when running end to end tests trough cypress, declare some extra # endpoints to provide input data for some of those tests if get_config()['e2e_tests_mode']: - from swh.web.tests.data import ( + from swh.web.tests.views import ( get_content_code_data_by_ext, get_content_other_data_by_ext, get_content_code_data_all_exts, get_content_code_data_by_filename, get_content_code_data_all_filenames, ) # noqa urlpatterns.append( url(r'^tests/data/content/code/extension/(?P.+)/$', get_content_code_data_by_ext, name='tests-content-code-extension')) urlpatterns.append( url(r'^tests/data/content/other/extension/(?P.+)/$', get_content_other_data_by_ext, name='tests-content-other-extension')) urlpatterns.append(url(r'^tests/data/content/code/extensions/$', get_content_code_data_all_exts, name='tests-content-code-extensions')) urlpatterns.append( url(r'^tests/data/content/code/filename/(?P.+)/$', get_content_code_data_by_filename, name='tests-content-code-filename')) urlpatterns.append(url(r'^tests/data/content/code/filenames/$', get_content_code_data_all_filenames, name='tests-content-code-filenames')) diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index 2af35d09..94fa4519 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,300 +1,304 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict from swh.web.config import get_config swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config['secret_key'] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config['debug'] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config['debug'] ALLOWED_HOSTS = ['127.0.0.1', 'localhost'] + swh_web_config['allowed_hosts'] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'rest_framework', 'swh.web.common', 'swh.web.api', 'swh.web.browse', 'webpack_loader', 'django_js_reverse', 'corsheaders', ] MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'corsheaders.middleware.CorsMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'swh.web.common.middlewares.ThrottlingHeadersMiddleware', ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config['serve_assets']: MIDDLEWARE.insert(0, 'django.middleware.gzip.GZipMiddleware') ROOT_URLCONF = 'swh.web.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(PROJECT_DIR, "../templates")], 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', 'swh.web.common.utils.context_processor' ], 'libraries': { 'swh_templatetags': 'swh.web.common.swh_templatetags', }, }, }, ] DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': swh_web_config['development_db'], } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', # noqa }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', # noqa }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'UTC' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = '/static/' # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, 'share/swh/web/static') if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, '../../../static') STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ['127.0.0.1'] throttle_rates = {} http_requests = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'] throttling = swh_web_config['throttling'] for limiter_scope, limiter_conf in throttling['scopes'].items(): if 'default' in limiter_conf['limiter_rate']: throttle_rates[limiter_scope] = limiter_conf['limiter_rate']['default'] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf['limiter_rate'] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf['limiter_rate']: throttle_rates[limiter_scope + '_' + http_request.lower()] = \ limiter_conf['limiter_rate'][http_request] REST_FRAMEWORK: Dict[str, Any] = { 'DEFAULT_RENDERER_CLASSES': ( 'rest_framework.renderers.JSONRenderer', 'swh.web.api.renderers.YAMLRenderer', 'rest_framework.renderers.TemplateHTMLRenderer' ), 'DEFAULT_THROTTLE_CLASSES': ( 'swh.web.common.throttling.SwhWebRateThrottle', ), - 'DEFAULT_THROTTLE_RATES': throttle_rates + 'DEFAULT_THROTTLE_RATES': throttle_rates, + 'DEFAULT_AUTHENTICATION_CLASSES': [ + 'rest_framework.authentication.SessionAuthentication', + 'swh.web.auth.backends.OIDCBearerTokenAuthentication', + ], } LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'filters': { 'require_debug_false': { '()': 'django.utils.log.RequireDebugFalse', }, 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'formatters': { 'request': { 'format': '[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s', # noqa 'datefmt': "%d/%b/%Y %H:%M:%S" }, 'simple': { 'format': '[%(asctime)s] [%(levelname)s] %(message)s', 'datefmt': "%d/%b/%Y %H:%M:%S" }, 'verbose': { 'format': '[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s - %(message)s', # noqa 'datefmt': "%d/%b/%Y %H:%M:%S" }, }, 'handlers': { 'console': { 'level': 'DEBUG', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'file': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'simple' }, 'file_request': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'request' }, 'console_verbose': { 'level': 'DEBUG', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'verbose' }, 'file_verbose': { 'level': 'WARNING', 'filters': ['require_debug_false'], 'class': 'logging.FileHandler', 'filename': os.path.join(swh_web_config['log_dir'], 'swh-web.log'), 'formatter': 'verbose' }, 'null': { 'class': 'logging.NullHandler', }, }, 'loggers': { '': { 'handlers': ['console_verbose', 'file_verbose'], 'level': 'DEBUG' if DEBUG else 'WARNING', }, 'django': { 'handlers': ['console'], 'level': 'DEBUG' if DEBUG else 'WARNING', 'propagate': False, }, 'django.request': { 'handlers': ['file_request'], 'level': 'DEBUG' if DEBUG else 'WARNING', 'propagate': False, }, 'django.db.backends': { 'handlers': ['null'], 'propagate': False }, 'django.utils.autoreload': { 'level': 'INFO', }, }, } WEBPACK_LOADER = { 'DEFAULT': { 'CACHE': False, 'BUNDLE_DIR_NAME': './', 'STATS_FILE': os.path.join(STATIC_DIR, 'webpack-stats.json'), 'POLL_INTERVAL': 0.1, 'TIMEOUT': None, 'IGNORE': ['.+\\.hot-update.js', '.+\\.map'] } } LOGIN_URL = '/admin/login/' LOGIN_REDIRECT_URL = 'admin' SESSION_ENGINE = 'django.contrib.sessions.backends.cache' CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache' }, 'db_cache': { 'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'swh_web_cache', } } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r'^/badge/.*$' AUTHENTICATION_BACKENDS = [ 'django.contrib.auth.backends.ModelBackend', 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend', ] OIDC_SWH_WEB_CLIENT_ID = 'swh-web' diff --git a/swh/web/tests/auth/test_api_auth.py b/swh/web/tests/auth/test_api_auth.py new file mode 100644 index 00000000..9cdd808b --- /dev/null +++ b/swh/web/tests/auth/test_api_auth.py @@ -0,0 +1,120 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from django.contrib.auth.models import AnonymousUser, User + +from swh.web.auth.models import OIDCUser +from swh.web.common.utils import reverse + +from .keycloak_mock import mock_keycloak +from . import sample_data + + +@pytest.mark.django_db +def test_drf_django_session_auth_success(mocker, client): + """ + Check user gets authenticated when querying the web api + through a web browser. + """ + url = reverse('api-1-stat-counters') + + mock_keycloak(mocker) + client.login(code='', code_verifier='', redirect_uri='') + + response = client.get(url) + request = response.wsgi_request + + assert response.status_code == 200 + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remoter used has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_success(mocker, api_client): + """ + Check user gets authenticated when querying the web api + through an HTTP client using bearer token authentication. + """ + url = reverse('api-1-stat-counters') + + access_token = sample_data.oidc_profile['access_token'] + + mock_keycloak(mocker) + api_client.credentials( + HTTP_AUTHORIZATION=f"Bearer {access_token}") + + response = api_client.get(url) + request = response.wsgi_request + + assert response.status_code == 200 + + # user should be authenticated + assert isinstance(request.user, OIDCUser) + + # check remoter used has not been saved to Django database + with pytest.raises(User.DoesNotExist): + User.objects.get(username=request.user.username) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_failure(mocker, api_client): + url = reverse('api-1-stat-counters') + + access_token = sample_data.oidc_profile['access_token'] + + # check for failed authentication but with expected token format + mock_keycloak(mocker, auth_success=False) + api_client.credentials( + HTTP_AUTHORIZATION=f"Bearer {access_token}") + + response = api_client.get(url) + request = response.wsgi_request + + assert response.status_code == 403 + assert isinstance(request.user, AnonymousUser) + + # check for failed authentication when token format is invalid + mock_keycloak(mocker) + api_client.credentials( + HTTP_AUTHORIZATION=f"Bearer invalid-token-format") + + response = api_client.get(url) + request = response.wsgi_request + + assert response.status_code == 403 + assert isinstance(request.user, AnonymousUser) + + +def test_drf_oidc_auth_invalid_or_missing_authorization_type(api_client): + url = reverse('api-1-stat-counters') + + access_token = sample_data.oidc_profile['access_token'] + + # missing authorization type + api_client.credentials( + HTTP_AUTHORIZATION=f"{access_token}") + + response = api_client.get(url) + request = response.wsgi_request + + assert response.status_code == 403 + assert isinstance(request.user, AnonymousUser) + + # invalid authorization type + api_client.credentials( + HTTP_AUTHORIZATION=f"Foo token") + + response = api_client.get(url) + request = response.wsgi_request + + assert response.status_code == 403 + assert isinstance(request.user, AnonymousUser) diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py index 88bc7c11..fac696ae 100644 --- a/swh/web/tests/auth/test_backends.py +++ b/swh/web/tests/auth/test_backends.py @@ -1,81 +1,163 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta from django.contrib.auth import authenticate, get_backends import pytest from django.conf import settings +from rest_framework.exceptions import AuthenticationFailed + +from swh.web.auth.backends import OIDCBearerTokenAuthentication from swh.web.auth.models import OIDCUser from swh.web.common.utils import reverse from . import sample_data from .keycloak_mock import mock_keycloak def _authenticate_user(request_factory): request = request_factory.get(reverse('oidc-login-complete')) return authenticate(request=request, code='some-code', code_verifier='some-code-verifier', redirect_uri='https://localhost:5004') def _check_authenticated_user(user): userinfo = sample_data.userinfo assert user is not None assert isinstance(user, OIDCUser) assert user.id != 0 assert user.username == userinfo['preferred_username'] assert user.password == '' assert user.first_name == userinfo['given_name'] assert user.last_name == userinfo['family_name'] assert user.email == userinfo['email'] assert user.is_staff == ('/staff' in userinfo['groups']) assert user.sub == userinfo['sub'] @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_success(mocker, request_factory): kc_oidc_mock = mock_keycloak(mocker) oidc_profile = sample_data.oidc_profile user = _authenticate_user(request_factory) _check_authenticated_user(user) decoded_token = kc_oidc_mock.decode_token( sample_data.oidc_profile['access_token']) auth_datetime = datetime.fromtimestamp(decoded_token['auth_time']) access_expiration = ( auth_datetime + timedelta(seconds=oidc_profile['expires_in'])) refresh_expiration = ( auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in'])) assert user.access_token == oidc_profile['access_token'] assert user.access_expiration == access_expiration assert user.id_token == oidc_profile['id_token'] assert user.refresh_token == oidc_profile['refresh_token'] assert user.refresh_expiration == refresh_expiration assert user.scope == oidc_profile['scope'] assert user.session_state == oidc_profile['session_state'] backend_path = 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend' assert user.backend == backend_path backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path) assert get_backends()[backend_idx].get_user(user.id) == user @pytest.mark.django_db def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory): mock_keycloak(mocker, auth_success=False) user = _authenticate_user(request_factory) assert user is None + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_success(mocker, + api_request_factory): + url = reverse('api-1-stat-counters') + drf_auth_backend = OIDCBearerTokenAuthentication() + + kc_oidc_mock = mock_keycloak(mocker) + + access_token = sample_data.oidc_profile['access_token'] + + request = api_request_factory.get( + url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + + # first authentication + user, _ = drf_auth_backend.authenticate(request) + _check_authenticated_user(user) + # oidc_profile is not filled when authenticating through bearer token + assert hasattr(user, 'access_token') and user.access_token is None + + # second authentication, should fetch userinfo from cache + # until token expires + user, _ = drf_auth_backend.authenticate(request) + _check_authenticated_user(user) + assert hasattr(user, 'access_token') and user.access_token is None + + # check user request to keycloak has been sent only once + kc_oidc_mock.userinfo.assert_called_once_with(access_token) + + +@pytest.mark.django_db +def test_drf_oidc_bearer_token_auth_backend_failure(mocker, + api_request_factory): + + url = reverse('api-1-stat-counters') + drf_auth_backend = OIDCBearerTokenAuthentication() + + # simulate a failed authentication with a bearer token in expected format + mock_keycloak(mocker, auth_success=False) + + access_token = sample_data.oidc_profile['access_token'] + + request = api_request_factory.get( + url, HTTP_AUTHORIZATION=f"Bearer {access_token}") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + # simulate a failed authentication with an invalid bearer token format + mock_keycloak(mocker) + + request = api_request_factory.get( + url, HTTP_AUTHORIZATION=f"Bearer invalid-token-format") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + +def test_drf_oidc_auth_invalid_or_missing_auth_type(api_request_factory): + + url = reverse('api-1-stat-counters') + drf_auth_backend = OIDCBearerTokenAuthentication() + + access_token = sample_data.oidc_profile['access_token'] + + # Invalid authorization type + request = api_request_factory.get( + url, HTTP_AUTHORIZATION=f"Foo token") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) + + # Missing authorization type + request = api_request_factory.get( + url, HTTP_AUTHORIZATION=f"{access_token}") + + with pytest.raises(AuthenticationFailed): + drf_auth_backend.authenticate(request) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 7b39535f..915e638b 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,484 +1,334 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os import random from copy import deepcopy -from typing import Dict - -from rest_framework.decorators import api_view -from rest_framework.response import Response from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage -from swh.model import from_disk from swh.model.hashutil import hash_to_hex, DEFAULT_ALGORITHMS -from swh.model.model import Content, Directory, Origin +from swh.model.model import Directory, Origin from swh.loader.git.from_disk import GitLoaderFromArchive from swh.search import get_search from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display, _re_encode_content ) from swh.web.common import service -from swh.web.common.highlightjs import get_hljs_language_from_filename # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', }, 'save_data': False, 'max_content_size': 100 * 1024 * 1024, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory' }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'], 'visit_date': ['Dec 1 2018, 01:00 UTC', 'Jan 20 2019, 15:00 UTC'] }, { 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'], 'visit_date': ['May 25 2018, 01:00 UTC'] }, { 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'], 'visit_date': ['Jan 1 2019, 01:00 UTC'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # To hold reference to the memory storage storage = None # Create search instance search = get_search('memory', {}) search.initialize() search.origin_update({'url': origin['url']} for origin in _TEST_ORIGINS) # Load git repositories from archives for origin in _TEST_ORIGINS: for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader = GitLoaderFromArchive(origin['url'], archive_path=origin_repo_archive, config=_TEST_LOADER_CONFIG, visit_date=origin['visit_date'][i]) if storage is None: storage = loader.storage else: loader.storage = storage loader.load() origin.update(storage.origin_get(origin)) # add an 'id' key if enabled search.origin_update([{'url': origin['url'], 'has_visits': True}]) for i in range(250): url = 'https://many.origins/%d' % (i+1) # storage.origin_add([{'url': url}]) storage.origin_add([Origin(url=url)]) search.origin_update([{'url': url, 'has_visits': True}]) visit = storage.origin_visit_add(url, '2019-12-03 13:55:05', 'tar') storage.origin_visit_update( url, visit.visit, snapshot='1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') contents = set() directories = set() revisions = set() releases = set() snapshots = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content result = storage.content_get_metadata(contents) contents = [] for sha1, contents_metadata in result.items(): for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[sha1] cnt = next(storage.content_get([sha1])) mimetype, encoding = get_mimetype_and_encoding_for_content( cnt['data']) _, _, cnt['data'] = _re_encode_content( mimetype, encoding, cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive storage.directory_add([Directory(entries=[])]) # Return tests data return { 'search': search, 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage, search): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({ 'storage': storage, 'indexer_storage': idx_storage, 'search': search, }) service.storage = storage service.idx_storage = idx_storage service.search = search - - -# Implement some special endpoints used to provide input tests data -# when executing end to end tests with cypress - -_content_code_data_exts = {} # type: Dict[str, Dict[str, str]] -_content_code_data_filenames = {} # type: Dict[str, Dict[str, str]] -_content_other_data_exts = {} # type: Dict[str, Dict[str, str]] - - -def _init_content_tests_data(data_path, data_dict, ext_key): - """ - Helper function to read the content of a directory, store it - into a test archive and add some files metadata (sha1 and/or - expected programming language) in a dict. - - Args: - data_path (str): path to a directory relative to the tests - folder of swh-web - data_dict (dict): the dict that will store files metadata - ext_key (bool): whether to use file extensions or filenames - as dict keys - """ - test_contents_dir = os.path.join( - os.path.dirname(__file__), data_path).encode('utf-8') - directory = from_disk.Directory.from_disk(path=test_contents_dir) - - contents = [] - for name, obj in directory.items(): - if isinstance(obj, from_disk.Content): - c = obj.to_model().with_data().to_dict() - c['status'] = 'visible' - sha1 = hash_to_hex(c['sha1']) - if ext_key: - key = name.decode('utf-8').split('.')[-1] - filename = 'test.' + key - else: - filename = name.decode('utf-8').split('/')[-1] - key = filename - language = get_hljs_language_from_filename(filename) - data_dict[key] = {'sha1': sha1, - 'language': language} - contents.append(Content.from_dict(c)) - storage = get_tests_data()['storage'] - storage.content_add(contents) - - -def _init_content_code_data_exts(): - """ - Fill a global dictionary which maps source file extension to - a code content example. - """ - global _content_code_data_exts - _init_content_tests_data('resources/contents/code/extensions', - _content_code_data_exts, True) - - -def _init_content_other_data_exts(): - """ - Fill a global dictionary which maps a file extension to - a content example. - """ - global _content_other_data_exts - _init_content_tests_data('resources/contents/other/extensions', - _content_other_data_exts, True) - - -def _init_content_code_data_filenames(): - """ - Fill a global dictionary which maps a filename to - a content example. - """ - global _content_code_data_filenames - _init_content_tests_data('resources/contents/code/filenames', - _content_code_data_filenames, False) - - -if config.get_config()['e2e_tests_mode']: - _init_content_code_data_exts() - _init_content_other_data_exts() - _init_content_code_data_filenames() - - -@api_view(['GET']) -def get_content_code_data_all_exts(request): - """ - Endpoint implementation returning a list of all source file - extensions to test for highlighting using cypress. - """ - return Response(sorted(_content_code_data_exts.keys()), - status=200, content_type='application/json') - - -@api_view(['GET']) -def get_content_code_data_by_ext(request, ext): - """ - Endpoint implementation returning metadata of a code content example - based on the source file extension. - """ - data = None - status = 404 - if ext in _content_code_data_exts: - data = _content_code_data_exts[ext] - status = 200 - return Response(data, status=status, content_type='application/json') - - -@api_view(['GET']) -def get_content_other_data_by_ext(request, ext): - """ - Endpoint implementation returning metadata of a content example - based on the file extension. - """ - _init_content_other_data_exts() - data = None - status = 404 - if ext in _content_other_data_exts: - data = _content_other_data_exts[ext] - status = 200 - return Response(data, status=status, content_type='application/json') - - -@api_view(['GET']) -def get_content_code_data_all_filenames(request): - """ - Endpoint implementation returning a list of all source filenames - to test for highlighting using cypress. - """ - return Response(sorted(_content_code_data_filenames.keys()), - status=200, content_type='application/json') - - -@api_view(['GET']) -def get_content_code_data_by_filename(request, filename): - """ - Endpoint implementation returning metadata of a code content example - based on the source filename. - """ - data = None - status = 404 - if filename in _content_code_data_filenames: - data = _content_code_data_filenames[filename] - status = 200 - return Response(data, status=status, content_type='application/json') diff --git a/swh/web/tests/views.py b/swh/web/tests/views.py new file mode 100644 index 00000000..6845f755 --- /dev/null +++ b/swh/web/tests/views.py @@ -0,0 +1,161 @@ +# Copyright (C) 2018-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# Implement some special endpoints used to provide input tests data +# when executing end to end tests with cypress + +import os + +from typing import Dict + +from rest_framework.decorators import api_view +from rest_framework.response import Response + +from swh.model import from_disk +from swh.model.hashutil import hash_to_hex +from swh.model.model import Content +from swh.web.common.highlightjs import get_hljs_language_from_filename +from swh.web.config import get_config +from swh.web.tests.data import get_tests_data + +_content_code_data_exts = {} # type: Dict[str, Dict[str, str]] +_content_code_data_filenames = {} # type: Dict[str, Dict[str, str]] +_content_other_data_exts = {} # type: Dict[str, Dict[str, str]] + + +def _init_content_tests_data(data_path, data_dict, ext_key): + """ + Helper function to read the content of a directory, store it + into a test archive and add some files metadata (sha1 and/or + expected programming language) in a dict. + + Args: + data_path (str): path to a directory relative to the tests + folder of swh-web + data_dict (dict): the dict that will store files metadata + ext_key (bool): whether to use file extensions or filenames + as dict keys + """ + test_contents_dir = os.path.join( + os.path.dirname(__file__), data_path).encode('utf-8') + directory = from_disk.Directory.from_disk(path=test_contents_dir) + + contents = [] + for name, obj in directory.items(): + if isinstance(obj, from_disk.Content): + c = obj.to_model().with_data().to_dict() + c['status'] = 'visible' + sha1 = hash_to_hex(c['sha1']) + if ext_key: + key = name.decode('utf-8').split('.')[-1] + filename = 'test.' + key + else: + filename = name.decode('utf-8').split('/')[-1] + key = filename + language = get_hljs_language_from_filename(filename) + data_dict[key] = {'sha1': sha1, + 'language': language} + contents.append(Content.from_dict(c)) + storage = get_tests_data()['storage'] + storage.content_add(contents) + + +def _init_content_code_data_exts(): + """ + Fill a global dictionary which maps source file extension to + a code content example. + """ + global _content_code_data_exts + _init_content_tests_data('resources/contents/code/extensions', + _content_code_data_exts, True) + + +def _init_content_other_data_exts(): + """ + Fill a global dictionary which maps a file extension to + a content example. + """ + global _content_other_data_exts + _init_content_tests_data('resources/contents/other/extensions', + _content_other_data_exts, True) + + +def _init_content_code_data_filenames(): + """ + Fill a global dictionary which maps a filename to + a content example. + """ + global _content_code_data_filenames + _init_content_tests_data('resources/contents/code/filenames', + _content_code_data_filenames, False) + + +if get_config()['e2e_tests_mode']: + _init_content_code_data_exts() + _init_content_other_data_exts() + _init_content_code_data_filenames() + + +@api_view(['GET']) +def get_content_code_data_all_exts(request): + """ + Endpoint implementation returning a list of all source file + extensions to test for highlighting using cypress. + """ + return Response(sorted(_content_code_data_exts.keys()), + status=200, content_type='application/json') + + +@api_view(['GET']) +def get_content_code_data_by_ext(request, ext): + """ + Endpoint implementation returning metadata of a code content example + based on the source file extension. + """ + data = None + status = 404 + if ext in _content_code_data_exts: + data = _content_code_data_exts[ext] + status = 200 + return Response(data, status=status, content_type='application/json') + + +@api_view(['GET']) +def get_content_other_data_by_ext(request, ext): + """ + Endpoint implementation returning metadata of a content example + based on the file extension. + """ + _init_content_other_data_exts() + data = None + status = 404 + if ext in _content_other_data_exts: + data = _content_other_data_exts[ext] + status = 200 + return Response(data, status=status, content_type='application/json') + + +@api_view(['GET']) +def get_content_code_data_all_filenames(request): + """ + Endpoint implementation returning a list of all source filenames + to test for highlighting using cypress. + """ + return Response(sorted(_content_code_data_filenames.keys()), + status=200, content_type='application/json') + + +@api_view(['GET']) +def get_content_code_data_by_filename(request, filename): + """ + Endpoint implementation returning metadata of a code content example + based on the source filename. + """ + data = None + status = 404 + if filename in _content_code_data_filenames: + data = _content_code_data_filenames[filename] + status = 200 + return Response(data, status=status, content_type='application/json')