Changeset View
Changeset View
Standalone View
Standalone View
swh/web/auth/views.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
from typing import Any, Dict, Union, cast | from typing import Any, Dict, Union, cast | ||||
from cryptography.fernet import InvalidToken | from cryptography.fernet import InvalidToken | ||||
from django.conf.urls import url | from django.conf.urls import url | ||||
from django.contrib.auth.decorators import login_required | from django.contrib.auth.decorators import login_required | ||||
from django.core.paginator import Paginator | from django.core.paginator import Paginator | ||||
from django.http import HttpRequest | from django.http import HttpRequest | ||||
from django.http.response import ( | from django.http.response import ( | ||||
HttpResponse, | HttpResponse, | ||||
HttpResponseBadRequest, | |||||
HttpResponseForbidden, | HttpResponseForbidden, | ||||
HttpResponseRedirect, | HttpResponseRedirect, | ||||
JsonResponse, | JsonResponse, | ||||
) | ) | ||||
from django.shortcuts import render | from django.shortcuts import render | ||||
from django.views.decorators.http import require_http_methods | from django.views.decorators.http import require_http_methods | ||||
from swh.auth.django.models import OIDCUser | from swh.auth.django.models import OIDCUser | ||||
from swh.auth.django.utils import keycloak_oidc_client | from swh.auth.django.utils import keycloak_oidc_client | ||||
from swh.auth.django.views import get_oidc_login_data, oidc_login_view | from swh.auth.django.views import get_oidc_login_data, oidc_login_view | ||||
from swh.auth.django.views import urlpatterns as auth_urlpatterns | from swh.auth.django.views import urlpatterns as auth_urlpatterns | ||||
from swh.auth.keycloak import KeycloakError, keycloak_error_message | |||||
from swh.web.auth.models import OIDCUserOfflineTokens | from swh.web.auth.models import OIDCUserOfflineTokens | ||||
from swh.web.auth.utils import decrypt_data, encrypt_data | from swh.web.auth.utils import decrypt_data, encrypt_data | ||||
from swh.web.common.exc import ForbiddenExc | from swh.web.common.exc import ForbiddenExc | ||||
from swh.web.common.utils import reverse | from swh.web.common.utils import reverse | ||||
from swh.web.config import get_config | from swh.web.config import get_config | ||||
def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: | def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | try: | ||||
data = json.loads(request.body.decode("ascii")) | data = json.loads(request.body.decode("ascii")) | ||||
user = cast(OIDCUser, request.user) | user = cast(OIDCUser, request.user) | ||||
token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) | token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) | ||||
secret = get_config()["secret_key"].encode() | secret = get_config()["secret_key"].encode() | ||||
salt = user.sub.encode() | salt = user.sub.encode() | ||||
decrypted_token = decrypt_data( | decrypted_token = decrypt_data( | ||||
_encrypted_token_bytes(token_data.offline_token), secret, salt | _encrypted_token_bytes(token_data.offline_token), secret, salt | ||||
) | ) | ||||
return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") | refresh_token = decrypted_token.decode("ascii") | ||||
# check token is still valid | |||||
oidc_client = keycloak_oidc_client() | |||||
oidc_client.refresh_token(refresh_token) | |||||
return HttpResponse(refresh_token, content_type="text/plain") | |||||
except InvalidToken: | except InvalidToken: | ||||
return HttpResponse(status=401) | return HttpResponse(status=401) | ||||
except KeycloakError as ke: | |||||
error_msg = keycloak_error_message(ke) | |||||
if error_msg in ( | |||||
"invalid_grant: Offline session not active", | |||||
"invalid_grant: Offline user session not found", | |||||
): | |||||
error_msg = "Bearer token has expired, please generate a new one." | |||||
return HttpResponseBadRequest(error_msg, content_type="text/plain") | |||||
@require_http_methods(["POST"]) | @require_http_methods(["POST"]) | ||||
def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: | def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: | ||||
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | ||||
return HttpResponseForbidden() | return HttpResponseForbidden() | ||||
try: | try: | ||||
data = json.loads(request.body.decode("ascii")) | data = json.loads(request.body.decode("ascii")) | ||||
▲ Show 20 Lines • Show All 49 Lines • Show Last 20 Lines |