Changeset View
Changeset View
Standalone View
Standalone View
swh/web/auth/views.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
from typing import Any, Dict, cast | from typing import Any, Dict, Union, cast | ||||
from cryptography.fernet import InvalidToken | from cryptography.fernet import InvalidToken | ||||
from django.conf.urls import url | from django.conf.urls import url | ||||
from django.contrib.auth.decorators import login_required | from django.contrib.auth.decorators import login_required | ||||
from django.core.paginator import Paginator | from django.core.paginator import Paginator | ||||
from django.http import HttpRequest | from django.http import HttpRequest | ||||
from django.http.response import ( | from django.http.response import ( | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: | ||||
table_data: Dict[str, Any] = {} | table_data: Dict[str, Any] = {} | ||||
table_data["recordsTotal"] = len(tokens_data) | table_data["recordsTotal"] = len(tokens_data) | ||||
table_data["draw"] = int(request.GET["draw"]) | table_data["draw"] = int(request.GET["draw"]) | ||||
table_data["data"] = tokens_data | table_data["data"] = tokens_data | ||||
table_data["recordsFiltered"] = len(tokens_data) | table_data["recordsFiltered"] = len(tokens_data) | ||||
return JsonResponse(table_data) | return JsonResponse(table_data) | ||||
def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes: | |||||
# token has been retrieved from a PosgreSQL database | |||||
if isinstance(token, memoryview): | |||||
return token.tobytes() | |||||
else: | |||||
return token | |||||
@require_http_methods(["POST"]) | @require_http_methods(["POST"]) | ||||
def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: | def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: | ||||
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | ||||
return HttpResponseForbidden() | return HttpResponseForbidden() | ||||
try: | try: | ||||
data = json.loads(request.body.decode("ascii")) | data = json.loads(request.body.decode("ascii")) | ||||
user = cast(OIDCUser, request.user) | user = cast(OIDCUser, request.user) | ||||
token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) | token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) | ||||
secret = get_config()["secret_key"].encode() | secret = get_config()["secret_key"].encode() | ||||
salt = user.sub.encode() | salt = user.sub.encode() | ||||
decrypted_token = decrypt_data(token_data.offline_token, secret, salt) | decrypted_token = decrypt_data( | ||||
_encrypted_token_bytes(token_data.offline_token), secret, salt | |||||
) | |||||
return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") | return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") | ||||
except InvalidToken: | except InvalidToken: | ||||
return HttpResponse(status=401) | return HttpResponse(status=401) | ||||
@require_http_methods(["POST"]) | @require_http_methods(["POST"]) | ||||
def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: | def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: | ||||
if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): | ||||
return HttpResponseForbidden() | return HttpResponseForbidden() | ||||
try: | try: | ||||
data = json.loads(request.body.decode("ascii")) | data = json.loads(request.body.decode("ascii")) | ||||
user = cast(OIDCUser, request.user) | user = cast(OIDCUser, request.user) | ||||
for token_id in data["token_ids"]: | for token_id in data["token_ids"]: | ||||
token_data = OIDCUserOfflineTokens.objects.get(id=token_id) | token_data = OIDCUserOfflineTokens.objects.get(id=token_id) | ||||
secret = get_config()["secret_key"].encode() | secret = get_config()["secret_key"].encode() | ||||
salt = user.sub.encode() | salt = user.sub.encode() | ||||
decrypted_token = decrypt_data(token_data.offline_token, secret, salt) | decrypted_token = decrypt_data( | ||||
_encrypted_token_bytes(token_data.offline_token), secret, salt | |||||
) | |||||
oidc_client = keycloak_oidc_client() | oidc_client = keycloak_oidc_client() | ||||
oidc_client.logout(decrypted_token.decode("ascii")) | oidc_client.logout(decrypted_token.decode("ascii")) | ||||
token_data.delete() | token_data.delete() | ||||
return HttpResponse(status=200) | return HttpResponse(status=200) | ||||
except InvalidToken: | except InvalidToken: | ||||
return HttpResponse(status=401) | return HttpResponse(status=401) | ||||
Show All 33 Lines |