diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py
index c4405613..0d119c7e 100644
--- a/swh/web/common/utils.py
+++ b/swh/web/common/utils.py
@@ -1,346 +1,350 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from datetime import datetime, timezone
from dateutil import parser as date_parser
from dateutil import tz
from typing import Optional, Dict, Any
import docutils.parsers.rst
import docutils.utils
from bs4 import BeautifulSoup
from docutils.core import publish_parts
from docutils.writers.html5_polyglot import Writer, HTMLTranslator
from django.urls import reverse as django_reverse
from django.http import QueryDict, HttpRequest
from prometheus_client.registry import CollectorRegistry
from rest_framework.authentication import SessionAuthentication
from swh.web.common.exc import BadInputExc
from swh.web.common.typing import QueryParameters
from swh.web.config import get_config
SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True)
swh_object_icons = {
"branch": "fa fa-code-fork",
"branches": "fa fa-code-fork",
"content": "fa fa-file-text",
"directory": "fa fa-folder",
"person": "fa fa-user",
"revisions history": "fa fa-history",
"release": "fa fa-tag",
"releases": "fa fa-tag",
"revision": "octicon-git-commit",
"snapshot": "fa fa-camera",
"visits": "fa fa-calendar",
}
def reverse(
viewname: str,
url_args: Optional[Dict[str, Any]] = None,
query_params: Optional[QueryParameters] = None,
current_app: Optional[str] = None,
urlconf: Optional[str] = None,
request: Optional[HttpRequest] = None,
) -> str:
"""An override of django reverse function supporting query parameters.
Args:
viewname: the name of the django view from which to compute a url
url_args: dictionary of url arguments indexed by their names
query_params: dictionary of query parameters to append to the
reversed url
current_app: the name of the django app tighten to the view
urlconf: url configuration module
request: build an absolute URI if provided
Returns:
str: the url of the requested view with processed arguments and
query parameters
"""
if url_args:
url_args = {k: v for k, v in url_args.items() if v is not None}
url = django_reverse(
viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app
)
if query_params:
query_params = {k: v for k, v in query_params.items() if v}
if query_params and len(query_params) > 0:
query_dict = QueryDict("", mutable=True)
for k in sorted(query_params.keys()):
query_dict[k] = query_params[k]
url += "?" + query_dict.urlencode(safe="/;:")
if request is not None:
url = request.build_absolute_uri(url)
return url
def datetime_to_utc(date):
"""Returns datetime in UTC without timezone info
Args:
date (datetime.datetime): input datetime with timezone info
Returns:
datetime.datetime: datetime in UTC without timezone info
"""
if date.tzinfo:
return date.astimezone(tz.gettz("UTC")).replace(tzinfo=timezone.utc)
else:
return date
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as UTC datetime.
Returns:
datetime.datetime: a timezone-aware datetime representing the
parsed value or None if the parsing fails.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
if not timestamp:
return None
try:
date = date_parser.parse(timestamp, ignoretz=False, fuzzy=True)
return datetime_to_utc(date)
except Exception:
try:
return datetime.utcfromtimestamp(float(timestamp)).replace(
tzinfo=timezone.utc
)
except (ValueError, OverflowError) as e:
raise BadInputExc(e)
def shorten_path(path):
"""Shorten the given path: for each hash present, only return the first
8 characters followed by an ellipsis"""
sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}"
sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}"
ret = re.sub(sha256_re, r"\1...", path)
return re.sub(sha1_re, r"\1...", ret)
def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"):
"""Turns a string representation of an ISO 8601 date string
to UTC and format it into a more human readable one.
For instance, from the following input
string: '2017-05-04T13:27:13+02:00' the following one
is returned: '04 May 2017, 11:27 UTC'.
Custom format string may also be provided
as parameter
Args:
iso_date (str): a string representation of an ISO 8601 date
fmt (str): optional date formatting string
Returns:
str: a formatted string representation of the input iso date
"""
if not iso_date:
return iso_date
date = parse_timestamp(iso_date)
return date.strftime(fmt)
def gen_path_info(path):
"""Function to generate path data navigation for use
with a breadcrumb in the swh web ui.
For instance, from a path /folder1/folder2/folder3,
it returns the following list::
[{'name': 'folder1', 'path': 'folder1'},
{'name': 'folder2', 'path': 'folder1/folder2'},
{'name': 'folder3', 'path': 'folder1/folder2/folder3'}]
Args:
path: a filesystem path
Returns:
list: a list of path data for navigation as illustrated above.
"""
path_info = []
if path:
sub_paths = path.strip("/").split("/")
path_from_root = ""
for p in sub_paths:
path_from_root += "/" + p
path_info.append({"name": p, "path": path_from_root.strip("/")})
return path_info
def parse_rst(text, report_level=2):
"""
Parse a reStructuredText string with docutils.
Args:
text (str): string with reStructuredText markups in it
report_level (int): level of docutils report messages to print
(1 info 2 warning 3 error 4 severe 5 none)
Returns:
docutils.nodes.document: a parsed docutils document
"""
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = docutils.frontend.OptionParser(
components=components
).get_default_values()
settings.report_level = report_level
document = docutils.utils.new_document("rst-doc", settings=settings)
parser.parse(text, document)
return document
def get_client_ip(request):
"""
Return the client IP address from an incoming HTTP request.
Args:
request (django.http.HttpRequest): the incoming HTTP request
Returns:
str: The client IP address
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
def context_processor(request):
"""
Django context processor used to inject variables
in all swh-web templates.
"""
config = get_config()
- if request.user.is_authenticated and not hasattr(request.user, "backend"):
+ if (
+ hasattr(request, "user")
+ and request.user.is_authenticated
+ and not hasattr(request.user, "backend")
+ ):
# To avoid django.template.base.VariableDoesNotExist errors
# when rendering templates when standard Django user is logged in.
request.user.backend = "django.contrib.auth.backends.ModelBackend"
return {
"swh_object_icons": swh_object_icons,
"available_languages": None,
"swh_client_config": config["client_config"],
"oidc_enabled": bool(config["keycloak"]["server_url"]),
}
class EnforceCSRFAuthentication(SessionAuthentication):
"""
Helper class to enforce CSRF validation on a DRF view
when a user is not authenticated.
"""
def authenticate(self, request):
user = getattr(request._request, "user", None)
self.enforce_csrf(request)
return (user, None)
def resolve_branch_alias(
snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Resolve branch alias in snapshot content.
Args:
snapshot: a full snapshot content
branch: a branch alias contained in the snapshot
Returns:
The real snapshot branch that got aliased.
"""
while branch and branch["target_type"] == "alias":
if branch["target"] in snapshot["branches"]:
branch = snapshot["branches"][branch["target"]]
else:
from swh.web.common import service
snp = service.lookup_snapshot(
snapshot["id"], branches_from=branch["target"], branches_count=1
)
if snp and branch["target"] in snp["branches"]:
branch = snp["branches"][branch["target"]]
else:
branch = None
return branch
class _NoHeaderHTMLTranslator(HTMLTranslator):
"""
Docutils translator subclass to customize the generation of HTML
from reST-formatted docstrings
"""
def __init__(self, document):
super().__init__(document)
self.body_prefix = []
self.body_suffix = []
_HTML_WRITER = Writer()
_HTML_WRITER.translator_class = _NoHeaderHTMLTranslator
def rst_to_html(rst: str) -> str:
"""
Convert reStructuredText document into HTML.
Args:
rst: A string containing a reStructuredText document
Returns:
Body content of the produced HTML conversion.
"""
settings = {
"initial_header_level": 2,
}
pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings)
return f'
{pp["html_body"]}
'
def prettify_html(html: str) -> str:
"""
Prettify an HTML document.
Args:
html: Input HTML document
Returns:
The prettified HTML document
"""
return BeautifulSoup(html, "lxml").prettify()
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
index 0a5eb38c..8b8dda0d 100644
--- a/swh/web/tests/auth/test_views.py
+++ b/swh/web/tests/auth/test_views.py
@@ -1,328 +1,338 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from urllib.parse import urljoin, urlparse
import uuid
from django.http import QueryDict
from django.contrib.auth.models import AnonymousUser, User
import pytest
from swh.web.auth.models import OIDCUser
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.common.utils import reverse
from swh.web.tests.django_asserts import assert_template_used, assert_contains
+from swh.web.urls import _default_view as homepage_view
from . import sample_data
from .keycloak_mock import mock_keycloak
@pytest.mark.django_db
def test_oidc_login_views_success(client, mocker):
"""
Simulate a successful login authentication with OpenID Connect
authorization code flow with PKCE.
"""
# mock Keycloak client
kc_oidc_mock = mock_keycloak(mocker)
# user initiates login process
login_url = reverse("oidc-login")
response = client.get(login_url)
request = response.wsgi_request
# should redirect to Keycloak authentication page in order
# for a user to login with its username / password
assert response.status_code == 302
assert isinstance(request.user, AnonymousUser)
parsed_url = urlparse(response["location"])
authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"]
query_dict = QueryDict(parsed_url.query)
# check redirect url is valid
assert urljoin(response["location"], parsed_url.path) == authorization_url
assert "client_id" in query_dict
assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID
assert "response_type" in query_dict
assert query_dict["response_type"] == "code"
assert "redirect_uri" in query_dict
assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request)
assert "code_challenge_method" in query_dict
assert query_dict["code_challenge_method"] == "S256"
assert "scope" in query_dict
assert query_dict["scope"] == "openid"
assert "state" in query_dict
assert "code_challenge" in query_dict
# check a login_data has been registered in user session
assert "login_data" in request.session
login_data = request.session["login_data"]
assert "code_verifier" in login_data
assert "state" in login_data
assert "redirect_uri" in login_data
assert login_data["redirect_uri"] == query_dict["redirect_uri"]
# once a user has identified himself in Keycloak, he is
# redirected to the 'oidc-login-complete' view to
# login in Django.
# generate authorization code / session state in the same
# manner as Keycloak
code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}"
session_state = str(uuid.uuid4())
login_complete_url = reverse(
"oidc-login-complete",
query_params={
"code": code,
"state": login_data["state"],
"session_state": session_state,
},
)
# login process finalization
response = client.get(login_complete_url)
request = response.wsgi_request
# should redirect to root url by default
assert response.status_code == 302
assert response["location"] == request.build_absolute_uri("/")
# user should be authenticated
assert isinstance(request.user, OIDCUser)
# check remote user has not been saved to Django database
with pytest.raises(User.DoesNotExist):
User.objects.get(username=request.user.username)
@pytest.mark.django_db
def test_oidc_logout_view_success(client, mocker):
"""
Simulate a successful logout operation with OpenID Connect.
"""
# mock Keycloak client
kc_oidc_mock = mock_keycloak(mocker)
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
kc_oidc_mock.authorization_code.assert_called()
# user initiates logout
oidc_logout_url = reverse("oidc-logout")
response = client.get(oidc_logout_url)
request = response.wsgi_request
# should redirect to logout page
assert response.status_code == 302
logout_url = reverse("logout", query_params={"remote_user": 1})
assert response["location"] == request.build_absolute_uri(logout_url)
# should have been logged out in Keycloak
kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"])
# check effective logout in Django
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_login_view_failure(client, mocker):
"""
Simulate a failed authentication with OpenID Connect.
"""
# mock Keycloak client
mock_keycloak(mocker, auth_success=False)
# user initiates login process
login_url = reverse("oidc-login")
response = client.get(login_url)
request = response.wsgi_request
# should render an error page
assert response.status_code == 500
assert_template_used(response, "error.html")
# no users should be logged in
assert isinstance(request.user, AnonymousUser)
# Simulate possible errors with OpenID Connect in the login complete view.
def test_oidc_login_complete_view_no_login_data(client, mocker):
# user initiates login process
login_url = reverse("oidc-login-complete")
response = client.get(login_url)
# should render an error page
assert_template_used(response, "error.html")
assert_contains(
response, "Login process has not been initialized.", status_code=500
)
def test_oidc_login_complete_view_missing_parameters(client, mocker):
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
"prompt": "",
}
session.save()
# user initiates login process
login_url = reverse("oidc-login-complete")
response = client.get(login_url)
request = response.wsgi_request
# should render an error page
assert_template_used(response, "error.html")
assert_contains(
response, "Missing query parameters for authentication.", status_code=400
)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
def test_oidc_login_complete_wrong_csrf_token(client, mocker):
# mock Keycloak client
mock_keycloak(mocker)
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
"prompt": "",
}
session.save()
# user initiates login process
login_url = reverse(
"oidc-login-complete", query_params={"code": "some-code", "state": "some-state"}
)
response = client.get(login_url)
request = response.wsgi_request
# should render an error page
assert_template_used(response, "error.html")
assert_contains(
response, "Wrong CSRF token, aborting login process.", status_code=400
)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_login_complete_wrong_code_verifier(client, mocker):
# mock Keycloak client
mock_keycloak(mocker, auth_success=False)
# simulate login process has been initialized
session = client.session
session["login_data"] = {
"code_verifier": "",
"state": str(uuid.uuid4()),
"redirect_uri": "",
"next_path": "",
"prompt": "",
}
session.save()
# check authentication error is reported
login_url = reverse(
"oidc-login-complete",
query_params={"code": "some-code", "state": session["login_data"]["state"]},
)
response = client.get(login_url)
request = response.wsgi_request
# should render an error page
assert_template_used(response, "error.html")
assert_contains(response, "User authentication failed.", status_code=500)
# no user should be logged in
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_logout_view_failure(client, mocker):
"""
Simulate a failed logout operation with OpenID Connect.
"""
# mock Keycloak client
kc_oidc_mock = mock_keycloak(mocker)
# login our test user
client.login(code="", code_verifier="", redirect_uri="")
err_msg = "Authentication server error"
kc_oidc_mock.logout.side_effect = Exception(err_msg)
# user initiates logout process
logout_url = reverse("oidc-logout")
response = client.get(logout_url)
request = response.wsgi_request
# should render an error page
assert_template_used(response, "error.html")
assert_contains(response, err_msg, status_code=500)
# user should be logged out from Django anyway
assert isinstance(request.user, AnonymousUser)
@pytest.mark.django_db
def test_oidc_silent_refresh_failure(client, mocker):
# mock Keycloak client
mock_keycloak(mocker)
next_path = reverse("swh-web-homepage")
# silent session refresh initialization
login_url = reverse(
"oidc-login", query_params={"next_path": next_path, "prompt": "none"}
)
response = client.get(login_url)
request = response.wsgi_request
login_data = request.session["login_data"]
# check prompt value has been registered in user session
assert "prompt" in login_data
assert login_data["prompt"] == "none"
# simulate a failed silent session refresh
session_state = str(uuid.uuid4())
login_complete_url = reverse(
"oidc-login-complete",
query_params={
"error": "login_required",
"state": login_data["state"],
"session_state": session_state,
},
)
# login process finalization
response = client.get(login_complete_url)
request = response.wsgi_request
# should redirect to logout page
assert response.status_code == 302
logout_url = reverse(
"logout", query_params={"next_path": next_path, "remote_user": 1}
)
assert response["location"] == logout_url
+
+
+def test_view_rendering_when_user_not_set_in_request(request_factory):
+ request = request_factory.get("/")
+ # Django RequestFactory do not set any user by default
+ assert not hasattr(request, "user")
+
+ response = homepage_view(request)
+ assert response.status_code == 200