diff --git a/cypress/integration/layout.spec.js b/cypress/integration/layout.spec.js
--- a/cypress/integration/layout.spec.js
+++ b/cypress/integration/layout.spec.js
@@ -11,7 +11,7 @@
it('should should contain all navigation links', function() {
cy.visit(url);
cy.get('.swh-top-bar a')
- .should('have.length', 4)
+ .should('have.length.of.at.least', 4)
.and('be.visible')
.and('have.attr', 'href');
});
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -21,6 +21,9 @@
[mypy-htmlmin.*]
ignore_missing_imports = True
+[mypy-keycloak.*]
+ignore_missing_imports = True
+
[mypy-magic.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,6 +18,7 @@
python-dateutil
pyyaml
requests
+python-keycloak >= 0.19.0
python-memcached
pybadges
sentry-sdk
@@ -26,5 +27,3 @@
# Doc dependencies
sphinx
sphinxcontrib-httpdomain
-
-
diff --git a/swh/web/admin/urls.py b/swh/web/admin/urls.py
--- a/swh/web/admin/urls.py
+++ b/swh/web/admin/urls.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
from django.conf.urls import url
-from django.contrib.auth.views import LoginView, LogoutView
+from django.contrib.auth.views import LoginView
from django.shortcuts import redirect
from swh.web.admin.adminurls import AdminUrls
@@ -17,12 +17,11 @@
return redirect('admin-origin-save')
-urlpatterns = [url(r'^$', _admin_default_view, name='admin'),
- url(r'^login/$',
- LoginView.as_view(template_name='login.html'),
- name='login'),
- url(r'^logout/$',
- LogoutView.as_view(template_name='logout.html'),
- name='logout')]
+urlpatterns = [
+ url(r'^$', _admin_default_view, name='admin'),
+ url(r'^login/$',
+ LoginView.as_view(template_name='login.html'),
+ name='login'),
+]
urlpatterns += AdminUrls.get_url_patterns()
diff --git a/swh/web/assets/src/bundles/webapp/webapp.css b/swh/web/assets/src/bundles/webapp/webapp.css
--- a/swh/web/assets/src/bundles/webapp/webapp.css
+++ b/swh/web/assets/src/bundles/webapp/webapp.css
@@ -295,6 +295,11 @@
color: #fecd1b;
}
+.swh-position-left {
+ position: absolute;
+ left: 0;
+}
+
.swh-position-right {
position: absolute;
right: 0;
diff --git a/swh/web/auth/__init__.py b/swh/web/auth/__init__.py
new file mode 100644
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/backends.py
@@ -0,0 +1,118 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timedelta
+from typing import Any, Dict, Optional, Tuple
+
+from django.core.cache import cache
+from django.http import HttpRequest
+import sentry_sdk
+
+from swh.web.auth.keycloak import KeycloakOpenIDConnect
+from swh.web.auth.utils import get_oidc_client
+from swh.web.auth.models import OIDCUser
+
+
+# OpenID Connect client to communicate with Keycloak server
+_oidc_client: KeycloakOpenIDConnect = get_oidc_client()
+
+
+def _oidc_user_from_info(userinfo: Dict[str, Any]) -> OIDCUser:
+ # compute an integer user identifier for Django User model
+ # by concatenating all groups of the UUID4 user identifier
+ # generated by Keycloak and converting it from hex to decimal
+ user_id = int(''.join(userinfo['sub'].split('-')), 16)
+
+ # create a Django user that will not be saved to database
+ user = OIDCUser(id=user_id,
+ username=userinfo['preferred_username'],
+ password='',
+ first_name=userinfo['given_name'],
+ last_name=userinfo['family_name'],
+ email=userinfo['email'])
+
+ # set is_staff user property based on groups
+ user.is_staff = '/staff' in userinfo['groups']
+
+ # add userinfo sub to custom User proxy model
+ user.sub = userinfo['sub']
+
+ return user
+
+
+def _oidc_user_from_profile(oidc_profile: Dict[str, Any],
+ userinfo: Optional[Dict[str, Any]] = None
+ ) -> Tuple[OIDCUser, Dict[str, Any]]:
+ # get access token
+ access_token = oidc_profile['access_token']
+
+ # request OIDC userinfo
+ if userinfo is None:
+ userinfo = _oidc_client.userinfo(access_token)
+
+ # create OIDCUser from userinfo
+ user = _oidc_user_from_info(userinfo)
+
+ # decode JWT token
+ decoded_token = _oidc_client.decode_token(access_token)
+
+ # get authentication init datetime
+ auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+
+ # compute OIDC tokens expiration date
+ oidc_profile['access_expiration'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['expires_in']))
+ oidc_profile['refresh_expiration'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['refresh_expires_in']))
+
+ # add OIDC profile data to custom User proxy model
+ for key, val in oidc_profile.items():
+ if hasattr(user, key):
+ setattr(user, key, val)
+
+ return user, userinfo
+
+
+class OIDCAuthorizationCodePKCEBackend:
+
+ def authenticate(self, request: HttpRequest, code: str, code_verifier: str,
+ redirect_uri: str) -> Optional[OIDCUser]:
+
+ user = None
+ try:
+ # try to authenticate user with OIDC PKCE authorization code flow
+ oidc_profile = _oidc_client.authorization_code(
+ code, redirect_uri, code_verifier=code_verifier)
+
+ # create Django user
+ user, userinfo = _oidc_user_from_profile(oidc_profile)
+
+ # save authenticated user data in cache
+ cache.set(f'user_{user.id}',
+ {'userinfo': userinfo, 'oidc_profile': oidc_profile},
+ timeout=oidc_profile['refresh_expires_in'])
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+
+ return user
+
+ def get_user(self, user_id: int) -> Optional[OIDCUser]:
+ # get user data from cache
+ user_oidc_data = cache.get(f'user_{user_id}')
+ if user_oidc_data:
+ try:
+ user, _ = _oidc_user_from_profile(
+ user_oidc_data['oidc_profile'], user_oidc_data['userinfo'])
+ # restore auth backend
+ setattr(user, 'backend',
+ f'{__name__}.{self.__class__.__name__}')
+ return user
+ except Exception as e:
+ sentry_sdk.capture_exception(e)
+ return None
+ else:
+ return None
diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/keycloak.py
@@ -0,0 +1,162 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Optional, Tuple
+from urllib.parse import urlencode
+
+from keycloak import KeycloakOpenID
+
+
+class KeycloakOpenIDConnect:
+ """
+ Wrapper class around python-keycloak to ease the interaction with Keycloak
+ for managing authentication and user permissions with OpenID Connect.
+ """
+
+ def __init__(self, server_url: str, realm_name: str, client_id: str,
+ realm_public_key: str = ''):
+ """
+ Args:
+ server_url: URL of the Keycloak server
+ realm_name: The realm name
+ client_id: The OpenID Connect client identifier
+ realm_public_key: The realm public key (will be dynamically
+ retrieved if not provided)
+ """
+ self._keycloak = KeycloakOpenID(
+ server_url=server_url,
+ client_id=client_id,
+ realm_name=realm_name,
+ )
+
+ self.server_url = server_url
+ self.realm_name = realm_name
+ self.client_id = client_id
+ self.realm_public_key = realm_public_key
+
+ def well_known(self) -> Dict[str, Any]:
+ """
+ Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
+
+ Returns:
+ A dictionary filled with OpenID Connect URIS.
+ """
+ return self._keycloak.well_know()
+
+ def authorization_url(self, redirect_uri: str,
+ **extra_params: str) -> str:
+ """
+ Get OpenID Connect authorization URL to authenticate users.
+
+ Args:
+ redirect_uri: URI to redirect to once a user is authenticated
+ extra_params: Extra query parameters to add to the
+ authorization URL
+ """
+ auth_url = self._keycloak.auth_url(redirect_uri)
+ if extra_params:
+ auth_url += '&%s' % urlencode(extra_params)
+ return auth_url
+
+ def authorization_code(self, code: str, redirect_uri: str,
+ **extra_params: str) -> Dict[str, Any]:
+ """
+ Get OpenID Connect authentication tokens using Authorization
+ Code flow.
+
+ Args:
+ code: Authorization code provided by Keycloak
+ redirect_uri: URI to redirect to once a user is authenticated
+ (must be the same as the one provided to authorization_url)
+ extra_params: Extra parameters to add in the authorization request
+ payload.
+ """
+ return self._keycloak.token(
+ grant_type='authorization_code',
+ code=code,
+ redirect_uri=redirect_uri,
+ **extra_params)
+
+ def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
+ """
+ Request a new access token from Keycloak using a refresh token.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+
+ Returns:
+ A dictionary filled with tokens info
+ """
+ return self._keycloak.refresh_token(refresh_token)
+
+ def decode_token(self, token: str,
+ options: Optional[Dict[str, Any]] = None
+ ) -> Dict[str, Any]:
+ """
+ Try to decode a JWT token.
+
+ Args:
+ token: A JWT token to decode
+ options: Options for jose.jwt.decode
+
+ Returns:
+ A dictionary filled with decoded token content
+ """
+ if not self.realm_public_key:
+ realm_public_key = self._keycloak.public_key()
+ self.realm_public_key = '-----BEGIN PUBLIC KEY-----\n'
+ self.realm_public_key += realm_public_key
+ self.realm_public_key += '\n-----END PUBLIC KEY-----'
+
+ return self._keycloak.decode_token(token, key=self.realm_public_key,
+ options=options)
+
+ def logout(self, refresh_token: str) -> None:
+ """
+ Logout a user by closing its authenticated session.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+ """
+ self._keycloak.logout(refresh_token)
+
+ def userinfo(self, access_token: str) -> Dict[str, Any]:
+ """
+ Return user information from its access token.
+
+ Args:
+ access_token: An access token provided by Keycloak
+
+ Returns:
+ A dictionary fillled with user information
+ """
+ return self._keycloak.userinfo(access_token)
+
+
+# stores instances of KeycloakOpenIDConnect class
+# dict keys are (realm_name, client_id) tuples
+_keycloak_oidc: Dict[Tuple[str, str], KeycloakOpenIDConnect] = {}
+
+
+def get_keycloak_oidc_client(server_url: str, realm_name: str,
+ client_id: str) -> KeycloakOpenIDConnect:
+ """
+ Instantiate a KeycloakOpenIDConnect class for a given client in a
+ given realm.
+
+ Args:
+ server_url: Base URL of a Keycloak server
+ realm_name: Name of the realm in Keycloak
+ client_id: Client identifier in the realm
+
+ Returns:
+ An object to ease the interaction with the Keycloak server
+ """
+ realm_client_key = (realm_name, client_id)
+ if realm_client_key not in _keycloak_oidc:
+ _keycloak_oidc[realm_client_key] = KeycloakOpenIDConnect(server_url,
+ realm_name,
+ client_id)
+ return _keycloak_oidc[realm_client_key]
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/models.py
@@ -0,0 +1,43 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime
+from typing import Optional
+
+from django.contrib.auth.models import User
+
+
+class OIDCUser(User):
+ """
+ Custom User proxy model for remote users storing OpenID Connect
+ related data: profile containing authorization tokens and userinfo.
+
+ The model is also not saved to database as all users are already stored
+ in the Keycloak one.
+ """
+
+ # OIDC subject identifier
+ sub: str = ''
+
+ # OIDC tokens and session related data, only relevant when a user
+ # authenticates from a web browser
+ access_token: Optional[str] = None
+ access_expiration: Optional[datetime] = None
+ id_token: Optional[str] = None
+ refresh_token: Optional[str] = None
+ refresh_expiration: Optional[datetime] = None
+ scope: Optional[str] = None
+ session_state: Optional[str] = None
+
+ class Meta:
+ app_label = 'swh.web.auth'
+ proxy = True
+
+ def save(self, **kwargs):
+ """
+ Override django.db.models.Model.save to avoid saving the remote
+ users to web application database.
+ """
+ pass
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/utils.py
@@ -0,0 +1,62 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+import secrets
+
+from base64 import urlsafe_b64encode
+from typing import Tuple
+
+from django.conf import settings
+
+from swh.web.auth.keycloak import (
+ KeycloakOpenIDConnect, get_keycloak_oidc_client
+)
+from swh.web.config import get_config
+
+
+def gen_oidc_pkce_codes() -> Tuple[str, str]:
+ """
+ Generates a code verifier and a code challenge to be used
+ with the OpenID Connect authorization code flow with PKCE
+ ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636).
+
+ PKCE replaces the static secret used in the standard authorization
+ code flow with a temporary one-time challenge, making it feasible
+ to use in public clients.
+
+ The implementation is inspired from that blog post:
+ https://www.stefaanlippens.net/oauth-code-flow-pkce.html
+ """
+ # generate a code verifier which is a long enough random alphanumeric
+ # string, only to be used "client side"
+ code_verifier_str = secrets.token_urlsafe(60)
+
+ # create the PKCE code challenge by hashing the code verifier with SHA256
+ # and encoding the result in URL-safe base64 (without padding)
+ code_challenge = hashlib.sha256(code_verifier_str.encode('ascii')).digest()
+ code_challenge_str = urlsafe_b64encode(code_challenge).decode('ascii')
+ code_challenge_str = code_challenge_str.replace('=', '')
+
+ return code_verifier_str, code_challenge_str
+
+
+def get_oidc_client(client_id: str = '') -> KeycloakOpenIDConnect:
+ """
+ Instantiate a KeycloakOpenIDConnect class for a given client in the
+ SoftwareHeritage realm.
+
+ Args:
+ client_id: client identifier in the SoftwareHeritage realm
+
+ Returns:
+ An object to ease the interaction with the Keycloak server
+ """
+ if not client_id:
+ client_id = settings.OIDC_SWH_WEB_CLIENT_ID
+ swhweb_config = get_config()
+ return get_keycloak_oidc_client(swhweb_config['keycloak']['server_url'],
+ swhweb_config['keycloak']['realm_name'],
+ client_id)
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/views.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import uuid
+
+from typing import cast
+
+from django.conf.urls import url
+from django.core.cache import cache
+from django.contrib.auth import authenticate, login, logout
+from django.http import HttpRequest
+from django.http.response import HttpResponse, HttpResponseRedirect
+
+from swh.web.auth.models import OIDCUser
+from swh.web.auth.utils import gen_oidc_pkce_codes, get_oidc_client
+from swh.web.common.exc import handle_view_exception, BadInputExc
+from swh.web.common.utils import reverse
+
+
+def oidc_login(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to initiate login process using OpenID Connect.
+ """
+ # generate a CSRF token
+ state = str(uuid.uuid4())
+ redirect_uri = reverse('oidc-login-complete', request=request)
+
+ code_verifier, code_challenge = gen_oidc_pkce_codes()
+
+ request.session['login_data'] = {
+ 'code_verifier': code_verifier,
+ 'state': state,
+ 'redirect_uri': redirect_uri,
+ 'next_path': request.GET.get('next_path'),
+ }
+
+ authorization_url_params = {
+ 'state': state,
+ 'code_challenge': code_challenge,
+ 'code_challenge_method': 'S256',
+ 'scope': 'openid',
+ }
+
+ try:
+ oidc_client = get_oidc_client()
+ authorization_url = oidc_client.authorization_url(
+ redirect_uri, **authorization_url_params)
+
+ return HttpResponseRedirect(authorization_url)
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to finalize login process using OpenID Connect.
+ """
+ try:
+ if 'login_data' not in request.session:
+ raise Exception('Login process has not been initialized.')
+
+ if 'code' not in request.GET and 'state' not in request.GET:
+ raise BadInputExc('Missing query parameters for authentication.')
+
+ # get CSRF token returned by OIDC server
+ state = request.GET['state']
+
+ login_data = request.session['login_data']
+
+ if state != login_data['state']:
+ raise BadInputExc('Wrong CSRF token, aborting login process.')
+
+ user = authenticate(request=request,
+ code=request.GET['code'],
+ code_verifier=login_data['code_verifier'],
+ redirect_uri=login_data['redirect_uri'])
+
+ if user is None:
+ raise Exception('User authentication failed.')
+
+ login(request, user)
+
+ del request.session['login_data']
+
+ redirect_url = (login_data['next_path'] or
+ request.build_absolute_uri('/'))
+
+ return HttpResponseRedirect(redirect_url)
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+def oidc_logout(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to logout using OpenID Connect.
+ """
+ try:
+ user = request.user
+ logout(request)
+ if hasattr(user, 'refresh_token'):
+ oidc_client = get_oidc_client()
+ user = cast(OIDCUser, user)
+ refresh_token = cast(str, user.refresh_token)
+ # end OpenID Connect session
+ oidc_client.logout(refresh_token)
+ # remove user data from cache
+ cache.delete(f'user_{user.id}')
+
+ logout_url = reverse('logout', query_params={'remote_user': 1})
+ return HttpResponseRedirect(request.build_absolute_uri(logout_url))
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+urlpatterns = [
+ url(r'^oidc/login/$', oidc_login, name='oidc-login'),
+ url(r'^oidc/login-complete/$', oidc_login_complete,
+ name='oidc-login-complete'),
+ url(r'^oidc/logout/$', oidc_logout, name='oidc-logout'),
+]
diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -343,10 +343,16 @@
Django context processor used to inject variables
in all swh-web templates.
"""
+ config = get_config()
+ if request.user.is_authenticated and not hasattr(request.user, 'backend'):
+ # To avoid django.template.base.VariableDoesNotExist errors
+ # when rendering templates when standard Django user is logged in.
+ request.user.backend = 'django.contrib.auth.backends.ModelBackend'
return {
'swh_object_icons': swh_object_icons,
'available_languages': None,
- 'swh_client_config': get_config()['client_config'],
+ 'swh_client_config': config['client_config'],
+ 'oidc_enabled': bool(config['keycloak']['server_url']),
}
diff --git a/swh/web/config.py b/swh/web/config.py
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -110,6 +110,10 @@
'es_workers_index_url': ('string', ''),
'history_counters_url': ('string', 'https://stats.export.softwareheritage.org/history_counters.json'), # noqa
'client_config': ('dict', {}),
+ 'keycloak': ('dict', {
+ 'server_url': '',
+ 'realm_name': ''
+ }),
}
swhweb_config = {} # type: Dict[str, Any]
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -45,7 +45,7 @@
'swh.web.browse',
'webpack_loader',
'django_js_reverse',
- 'corsheaders'
+ 'corsheaders',
]
MIDDLEWARE = [
@@ -57,7 +57,7 @@
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
- 'swh.web.common.middlewares.ThrottlingHeadersMiddleware'
+ 'swh.web.common.middlewares.ThrottlingHeadersMiddleware',
]
# Compress all assets (static ones and dynamically generated html)
@@ -289,3 +289,10 @@
CORS_ORIGIN_ALLOW_ALL = True
CORS_URLS_REGEX = r'^/badge/.*$'
+
+AUTHENTICATION_BACKENDS = [
+ 'django.contrib.auth.backends.ModelBackend',
+ 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend',
+]
+
+OIDC_SWH_WEB_CLIENT_ID = 'swh-web'
diff --git a/swh/web/settings/tests.py b/swh/web/settings/tests.py
--- a/swh/web/settings/tests.py
+++ b/swh/web/settings/tests.py
@@ -80,7 +80,11 @@
'exempted_networks': ['127.0.0.0/8']
}
}
- }
+ },
+ 'keycloak': {
+ 'server_url': 'http://localhost:8080/auth',
+ 'realm_name': 'SoftwareHeritage',
+ },
})
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -71,6 +71,11 @@
+ {% if oidc_enabled %}
+ -
+ Donate
+
+ {% endif %}
-
Home
@@ -81,9 +86,20 @@
Documentation
-
- {% if user.is_authenticated and user.is_staff %}
+ {% url 'logout' as logout_url %}
+ {% if user.is_authenticated %}
Logged in as {{ user.username }},
- logout
+ {% if 'OIDC' in user.backend %}
+ logout
+ {% else %}
+ logout
+ {% endif %}
+ {% elif oidc_enabled %}
+ {% if request.path != logout_url %}
+ login
+ {% else %}
+ login
+ {% endif %}
{% else %}
Donate
{% endif %}
@@ -160,7 +176,7 @@
Help
- {% if user.is_authenticated %}
+ {% if user.is_authenticated and user.is_staff %}
-
diff --git a/swh/web/templates/logout.html b/swh/web/templates/logout.html
--- a/swh/web/templates/logout.html
+++ b/swh/web/templates/logout.html
@@ -17,5 +17,11 @@
{% block content %}
You have been successfully logged out.
-Log in again.
+
+{% if oidc_enabled and 'remote_user' in request.GET %}
+
+{% else %}
+
+{% endif %}
+Log in again.
{% endblock %}
diff --git a/swh/web/tests/auth/__init__.py b/swh/web/tests/auth/__init__.py
new file mode 100644
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -0,0 +1,77 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from copy import copy
+from unittest.mock import Mock
+
+from django.conf import settings
+from django.utils import timezone
+
+from swh.web.auth.keycloak import KeycloakOpenIDConnect
+from swh.web.config import get_config
+
+from .sample_data import oidc_profile, realm_public_key, userinfo
+
+
+class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
+
+ def __init__(self, auth_success=True):
+ swhweb_config = get_config()
+ super().__init__(swhweb_config['keycloak']['server_url'],
+ swhweb_config['keycloak']['realm_name'],
+ settings.OIDC_SWH_WEB_CLIENT_ID)
+ self._keycloak.public_key = lambda: realm_public_key
+ self._keycloak.well_know = lambda: {
+ 'issuer': f'{self.server_url}realms/{self.realm_name}',
+ 'authorization_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/auth'),
+ 'token_endpoint': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/token'),
+ 'token_introspection_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/token/'
+ 'introspect'),
+ 'userinfo_endpoint': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/userinfo'),
+ 'end_session_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/logout'),
+ 'jwks_uri': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/certs'),
+ }
+ self.authorization_code = Mock()
+ self.userinfo = Mock()
+ self.logout = Mock()
+ if auth_success:
+ self.authorization_code.return_value = copy(oidc_profile)
+ self.userinfo.return_value = copy(userinfo)
+ else:
+ self.authorization_url = Mock()
+ exception = Exception('Authentication failed')
+ self.authorization_code.side_effect = exception
+ self.authorization_url.side_effect = exception
+ self.userinfo.side_effect = exception
+ self.logout.side_effect = exception
+
+ def decode_token(self, token):
+ # skip signature expiration check as we use a static oidc_profile
+ # for the tests with expired tokens in it
+ options = {'verify_exp': False}
+ decoded = super().decode_token(token, options)
+ # tweak auth and exp time for tests
+ expire_in = decoded['exp'] - decoded['auth_time']
+ decoded['auth_time'] = int(timezone.now().timestamp())
+ decoded['exp'] = decoded['auth_time'] + expire_in
+ return decoded
+
+
+def mock_keycloak(mocker, auth_success=True):
+ kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success)
+ mock_get_oidc_client = mocker.patch(
+ 'swh.web.auth.views.get_oidc_client')
+ mock_get_oidc_client.return_value = kc_oidc_mock
+ mocker.patch('swh.web.auth.backends._oidc_client', kc_oidc_mock)
+ return kc_oidc_mock
diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/sample_data.py
@@ -0,0 +1,95 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+realm_public_key = (
+ 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnqF4xvGjaI54P6WtJvyGayxP8A93u'
+ 'NcA3TH6jitwmyAalj8dN8/NzK9vrdlSA3Ibvp/XQujPSOP7a35YiYFscEJnogTXQpE/FhZrUY'
+ 'y21U6ezruVUv4z/ER1cYLb+q5ZI86nXSTNCAbH+lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy'
+ '5o6ke1G3fXbNSXwF4qlWAzo1o7Ms8qNrNyOG8FPx24dvm9xMH7/08IPvh9KUqlnP8h6olpxHr'
+ 'drX/q4E+Nzj8Tr8p7Z5CimInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0/LO2zYie'
+ 'Hl5Lv7Iig4FOIXIVCaDGQIDAQAB'
+)
+
+oidc_profile = {
+ 'access_token': ('eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV'
+ 'Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.'
+ 'eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz'
+ 'MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz'
+ 'MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs'
+ 'bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj'
+ 'b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2'
+ 'MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi'
+ 'YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy'
+ 'YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi'
+ 'MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6'
+ 'eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0'
+ 'aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl'
+ 'cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz'
+ 'Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg'
+ 'cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv'
+ 'aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi'
+ 'am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi'
+ 'OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-'
+ 'Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB'
+ 'AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO'
+ 'kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc'
+ 'HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl'
+ 'rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE'
+ 'oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ'
+ '6A'),
+ 'expires_in': 600,
+ 'id_token': ('eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0'
+ 'TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki'
+ 'OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi'
+ 'OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo'
+ 'dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp'
+ 'dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh'
+ 'NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13'
+ 'ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk'
+ 'ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx'
+ 'IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn'
+ 'cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2'
+ 'ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi'
+ 'am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee'
+ 'JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL'
+ 'aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_'
+ 'PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE'
+ '0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN'
+ 'ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX'
+ 'ZbYnitD1Typ6Q'),
+ 'not-before-policy': 0,
+ 'refresh_expires_in': 1800,
+ 'refresh_token': ('eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM'
+ 'zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk'
+ 'iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC'
+ 'JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL'
+ 'CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv'
+ 'U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q'
+ '6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj'
+ 'oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid'
+ 'HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi'
+ 'OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ'
+ '2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl'
+ 'sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic'
+ 'mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu'
+ 'YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc'
+ 'tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG'
+ 'UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI'),
+ 'scope': 'openid email profile',
+ 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d',
+ 'token_type': 'bearer'
+}
+
+userinfo = {
+ 'email': 'john.doe@example.com',
+ 'email_verified': False,
+ 'family_name': 'Doe',
+ 'given_name': 'John',
+ 'groups': ['/staff'],
+ 'name': 'John Doe',
+ 'preferred_username': 'johndoe',
+ 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200'
+}
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_backends.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timedelta
+
+from django.contrib.auth import authenticate, get_backends
+
+import pytest
+
+from django.conf import settings
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+
+from . import sample_data
+from .keycloak_mock import mock_keycloak
+
+
+def _authenticate_user(request_factory):
+ request = request_factory.get(reverse('oidc-login-complete'))
+
+ return authenticate(request=request,
+ code='some-code',
+ code_verifier='some-code-verifier',
+ redirect_uri='https://localhost:5004')
+
+
+def _check_authenticated_user(user):
+ userinfo = sample_data.userinfo
+ assert user is not None
+ assert isinstance(user, OIDCUser)
+ assert user.id != 0
+ assert user.username == userinfo['preferred_username']
+ assert user.password == ''
+ assert user.first_name == userinfo['given_name']
+ assert user.last_name == userinfo['family_name']
+ assert user.email == userinfo['email']
+ assert user.is_staff == ('/staff' in userinfo['groups'])
+ assert user.sub == userinfo['sub']
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_success(mocker, request_factory):
+ kc_oidc_mock = mock_keycloak(mocker)
+ oidc_profile = sample_data.oidc_profile
+ user = _authenticate_user(request_factory)
+
+ _check_authenticated_user(user)
+
+ decoded_token = kc_oidc_mock.decode_token(
+ sample_data.oidc_profile['access_token'])
+ auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+
+ access_expiration = (
+ auth_datetime + timedelta(seconds=oidc_profile['expires_in']))
+ refresh_expiration = (
+ auth_datetime + timedelta(seconds=oidc_profile['refresh_expires_in']))
+
+ assert user.access_token == oidc_profile['access_token']
+ assert user.access_expiration == access_expiration
+ assert user.id_token == oidc_profile['id_token']
+ assert user.refresh_token == oidc_profile['refresh_token']
+ assert user.refresh_expiration == refresh_expiration
+ assert user.scope == oidc_profile['scope']
+ assert user.session_state == oidc_profile['session_state']
+
+ backend_path = 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend'
+ assert user.backend == backend_path
+ backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
+ assert get_backends()[backend_idx].get_user(user.id) == user
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory):
+ mock_keycloak(mocker, auth_success=False)
+
+ user = _authenticate_user(request_factory)
+
+ assert user is None
diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_utils.py
@@ -0,0 +1,37 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+import re
+
+from base64 import urlsafe_b64encode
+
+from swh.web.auth.utils import gen_oidc_pkce_codes
+
+
+def test_gen_oidc_pkce_codes():
+ """
+ Check generated PKCE codes respect the specification
+ (see https://tools.ietf.org/html/rfc7636#section-4.1)
+ """
+ code_verifier, code_challenge = gen_oidc_pkce_codes()
+
+ # check the code verifier only contains allowed characters
+ assert re.match(r'[a-zA-Z0-9-\._~]+', code_verifier)
+
+ # check minimum and maximum authorized length for the
+ # code verifier
+ assert len(code_verifier) >= 43
+ assert len(code_verifier) <= 128
+
+ # compute code challenge from code verifier
+ challenge = hashlib.sha256(code_verifier.encode('ascii')).digest()
+ challenge = urlsafe_b64encode(challenge).decode('ascii')
+ challenge = challenge.replace('=', '')
+
+ # check base64 padding is not present
+ assert not code_challenge[-1].endswith('=')
+ # check code challenge is valid
+ assert code_challenge == challenge
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_views.py
@@ -0,0 +1,275 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from urllib.parse import urljoin, urlparse
+import uuid
+
+from django.conf import settings
+from django.http import QueryDict
+from django.contrib.auth.models import AnonymousUser, User
+
+import pytest
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+from swh.web.tests.django_asserts import assert_template_used, assert_contains
+
+from . import sample_data
+from .keycloak_mock import mock_keycloak
+
+
+@pytest.mark.django_db
+def test_oidc_login_views_success(client, mocker):
+ """
+ Simulate a successful login authentication with OpenID Connect
+ authorization code flow with PKCE.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ # user initiates login process
+ login_url = reverse('oidc-login')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should redirect to Keycloak authentication page in order
+ # for a user to login with its username / password
+ assert response.status_code == 302
+ assert isinstance(request.user, AnonymousUser)
+
+ parsed_url = urlparse(response['location'])
+
+ authorization_url = kc_oidc_mock.well_known()['authorization_endpoint']
+ query_dict = QueryDict(parsed_url.query)
+
+ # check redirect url is valid
+ assert urljoin(response['location'], parsed_url.path) == authorization_url
+ assert 'client_id' in query_dict
+ assert query_dict['client_id'] == settings.OIDC_SWH_WEB_CLIENT_ID
+ assert 'response_type' in query_dict
+ assert query_dict['response_type'] == 'code'
+ assert 'redirect_uri' in query_dict
+ assert query_dict['redirect_uri'] == reverse('oidc-login-complete',
+ request=request)
+ assert 'code_challenge_method' in query_dict
+ assert query_dict['code_challenge_method'] == 'S256'
+ assert 'scope' in query_dict
+ assert query_dict['scope'] == 'openid'
+ assert 'state' in query_dict
+ assert 'code_challenge' in query_dict
+
+ # check a login_data has been registered in user session
+ assert 'login_data' in request.session
+ login_data = request.session['login_data']
+ assert 'code_verifier' in login_data
+ assert 'state' in login_data
+ assert 'redirect_uri' in login_data
+ assert login_data['redirect_uri'] == query_dict['redirect_uri']
+
+ # once a user has identified himself in Keycloak, he is
+ # redirected to the 'oidc-login-complete' view to
+ # login in Django.
+
+ # generate authorization code / session state in the same
+ # manner as Keycloak
+ code = f'{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}'
+ session_state = str(uuid.uuid4())
+
+ login_complete_url = reverse('oidc-login-complete',
+ query_params={'code': code,
+ 'state': login_data['state'],
+ 'session_state': session_state})
+
+ # login process finalization
+ response = client.get(login_complete_url)
+ request = response.wsgi_request
+
+ # should redirect to root url by default
+ assert response.status_code == 302
+ assert response['location'] == request.build_absolute_uri('/')
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remote user has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_oidc_logout_view_success(client, mocker):
+ """
+ Simulate a successful logout operation with OpenID Connect.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+ # login our test user
+ client.login(code='', code_verifier='', redirect_uri='')
+ kc_oidc_mock.authorization_code.assert_called()
+
+ # user initiates logout
+ oidc_logout_url = reverse('oidc-logout')
+ response = client.get(oidc_logout_url)
+ request = response.wsgi_request
+
+ # should redirect to logout page
+ assert response.status_code == 302
+ logout_url = reverse('logout', query_params={'remote_user': 1})
+ assert response['location'] == request.build_absolute_uri(logout_url)
+
+ # should have been logged out in Keycloak
+ kc_oidc_mock.logout.assert_called_with(
+ sample_data.oidc_profile['refresh_token'])
+
+ # check effective logout in Django
+ assert isinstance(request.user, AnonymousUser)
+
+
+@pytest.mark.django_db
+def test_oidc_login_view_failure(client, mocker):
+ """
+ Simulate a failed authentication with OpenID Connect.
+ """
+ # mock Keycloak client
+ mock_keycloak(mocker, auth_success=False)
+
+ # user initiates login process
+ login_url = reverse('oidc-login')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert response.status_code == 500
+ assert_template_used(response, "error.html")
+
+ # no users should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+
+# Simulate possible errors with OpenID Connect in the login complete view.
+
+def test_oidc_login_complete_view_no_login_data(client, mocker):
+ # user initiates login process
+ login_url = reverse('oidc-login-complete')
+ response = client.get(login_url)
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'Login process has not been initialized.',
+ status_code=500)
+
+
+def test_oidc_login_complete_view_missing_parameters(client, mocker):
+ # simulate login process has been initialized
+ session = client.session
+ session['login_data'] = {
+ 'code_verifier': '',
+ 'state': str(uuid.uuid4()),
+ 'redirect_uri': '',
+ 'next': None,
+ }
+ session.save()
+
+ # user initiates login process
+ login_url = reverse('oidc-login-complete')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'Missing query parameters for authentication.',
+ status_code=400)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+
+def test_oidc_login_complete_wrong_csrf_token(client, mocker):
+ # mock Keycloak client
+ mock_keycloak(mocker)
+
+ # simulate login process has been initialized
+ session = client.session
+ session['login_data'] = {
+ 'code_verifier': '',
+ 'state': str(uuid.uuid4()),
+ 'redirect_uri': '',
+ 'next': None,
+ }
+ session.save()
+
+ # user initiates login process
+ login_url = reverse('oidc-login-complete',
+ query_params={'code': 'some-code',
+ 'state': 'some-state'})
+
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'Wrong CSRF token, aborting login process.',
+ status_code=400)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+
+@pytest.mark.django_db
+def test_oidc_login_complete_wrong_code_verifier(client, mocker):
+ # mock Keycloak client
+ mock_keycloak(mocker, auth_success=False)
+
+ # simulate login process has been initialized
+ session = client.session
+ session['login_data'] = {
+ 'code_verifier': '',
+ 'state': str(uuid.uuid4()),
+ 'redirect_uri': '',
+ 'next': None,
+ }
+ session.save()
+
+ # check authentication error is reported
+ login_url = reverse('oidc-login-complete',
+ query_params={'code': 'some-code',
+ 'state': session['login_data']['state']})
+
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'User authentication failed.',
+ status_code=500)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+
+@pytest.mark.django_db
+def test_oidc_logout_view_failure(client, mocker):
+ """
+ Simulate a failed logout operation with OpenID Connect.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+ # login our test user
+ client.login(code='', code_verifier='', redirect_uri='')
+
+ err_msg = 'Authentication server error'
+ kc_oidc_mock.logout.side_effect = Exception(err_msg)
+
+ # user initiates logout process
+ logout_url = reverse('oidc-logout')
+ response = client.get(logout_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, err_msg, status_code=500)
+
+ # user should be logged out from Django anyway
+ assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/urls.py b/swh/web/urls.py
--- a/swh/web/urls.py
+++ b/swh/web/urls.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2017-2019 The Software Heritage developers
+# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -8,7 +8,9 @@
from django.conf.urls import (
url, include, handler400, handler403, handler404, handler500
)
+from django.contrib.auth.views import LogoutView
from django.contrib.staticfiles.views import serve
+
from django.shortcuts import render
from django.views.generic.base import RedirectView
@@ -40,6 +42,10 @@
url(r'^(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$',
swh_id_browse, name='browse-swh-id'),
url(r'^', include('swh.web.misc.urls')),
+ url(r'^', include('swh.web.auth.views')),
+ url(r'^logout/$',
+ LogoutView.as_view(template_name='logout.html'),
+ name='logout'),
]