diff --git a/swh/web/auth/mailmap.py b/swh/web/auth/mailmap.py new file mode 100644 index 00000000..378e90e2 --- /dev/null +++ b/swh/web/auth/mailmap.py @@ -0,0 +1,54 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.conf.urls import url +from django.http.response import HttpResponse, HttpResponseForbidden +from rest_framework import serializers +from rest_framework.decorators import api_view +from rest_framework.request import Request +from rest_framework.response import Response + +from swh.web.auth.models import UserMailmap +from swh.web.auth.utils import MAILMAP_PERMISSION + + +class UserMailmapSerializer(serializers.ModelSerializer): + class Meta: + model = UserMailmap + fields = "__all__" + + +@api_view(["POST"]) +def profile_add_mailmap(request: Request) -> HttpResponse: + if not request.user.has_perm(MAILMAP_PERMISSION): + return HttpResponseForbidden() + + UserMailmap.objects.create(user_id=str(request.user.id), **request.data) + mm = UserMailmap.objects.get( + user_id=str(request.user.id), from_email=request.data.get("from_email") + ) + return Response(UserMailmapSerializer(mm).data) + + +@api_view(["POST"]) +def profile_update_mailmap(request: Request) -> HttpResponse: + if not request.user.has_perm(MAILMAP_PERMISSION): + return HttpResponseForbidden() + + UserMailmap.objects.update(user_id=str(request.user.id), **request.data) + mm = UserMailmap.objects.get( + user_id=str(request.user.id), from_email=request.data.get("from_email") + ) + return Response(UserMailmapSerializer(mm).data) + + +urlpatterns = [ + url(r"^profile/mailmap/add$", profile_add_mailmap, name="profile-mailmap-add",), + url( + r"^profile/mailmap/update$", + profile_update_mailmap, + name="profile-mailmap-update", + ), +] diff --git a/swh/web/auth/migrations/0004_usermailmap.py b/swh/web/auth/migrations/0004_usermailmap.py new file mode 100644 index 00000000..86f25499 --- /dev/null +++ b/swh/web/auth/migrations/0004_usermailmap.py @@ -0,0 +1,45 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("swh_web_auth", "0003_delete_oidcuser"), + ] + + operations = [ + migrations.CreateModel( + name="UserMailmap", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("user_id", models.CharField(max_length=50, null=True)), + ("from_email", models.TextField(unique=True, null=False)), + ("from_email_verified", models.BooleanField(default=False)), + ( + "from_email_verification_request_date", + models.DateTimeField(null=True), + ), + ("display_name", models.TextField(null=False)), + ("display_name_activated", models.BooleanField(default=False)), + ("to_email", models.TextField(null=True)), + ("to_email_verified", models.BooleanField(default=False)), + ("to_email_verification_request_date", models.DateTimeField(null=True)), + ("mailmap_last_processing_date", models.DateTimeField(null=True)), + ("last_update_date", models.DateTimeField(auto_now=True)), + ], + options={"db_table": "user_mailmap",}, + ), + ] diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py index 5ac29aea..c65bc76b 100644 --- a/swh/web/auth/models.py +++ b/swh/web/auth/models.py @@ -1,20 +1,63 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.db import models class OIDCUserOfflineTokens(models.Model): """ Model storing encrypted bearer tokens generated by users. """ user_id = models.CharField(max_length=50) creation_date = models.DateTimeField(auto_now_add=True) offline_token = models.BinaryField() class Meta: app_label = "swh_web_auth" db_table = "oidc_user_offline_tokens" + + +class UserMailmap(models.Model): + """ + Model storing mailmap settings submitted by users. + """ + + user_id = models.CharField(max_length=50, null=True) + """Optional user id from Keycloak""" + + from_email = models.TextField(unique=True, null=False) + """Email address to find author in the archive""" + + from_email_verified = models.BooleanField(default=False) + """Indicates if the from email has been verified""" + + from_email_verification_request_date = models.DateTimeField(null=True) + """Last from email verification request date""" + + display_name = models.TextField(null=False) + """Display name to use for the author instead of the archived one""" + + display_name_activated = models.BooleanField(default=False) + """Indicates if the new display name should be used""" + + to_email = models.TextField(null=True) + """Optional new email to use in the display name instead of the archived one""" + + to_email_verified = models.BooleanField(default=False) + """Indicates if the to email has been verified""" + + to_email_verification_request_date = models.DateTimeField(null=True) + """Last to email verification request date""" + + mailmap_last_processing_date = models.DateTimeField(null=True) + """Last mailmap synchronisation date with swh-storage""" + + last_update_date = models.DateTimeField(auto_now=True) + """Last date that mailmap model was updated""" + + class Meta: + app_label = "swh_web_auth" + db_table = "user_mailmap" diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py index e213a714..341d0f06 100644 --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -1,96 +1,97 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import urlsafe_b64encode from typing import List from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from django.http.request import HttpRequest OIDC_SWH_WEB_CLIENT_ID = "swh-web" SWH_AMBASSADOR_PERMISSION = "swh.ambassador" API_SAVE_ORIGIN_PERMISSION = "swh.web.api.save_origin" ADMIN_LIST_DEPOSIT_PERMISSION = "swh.web.admin.list_deposits" +MAILMAP_PERMISSION = "swh.web.mailmap" def _get_fernet(password: bytes, salt: bytes) -> Fernet: """ Instantiate a Fernet system from a password and a salt value (see https://cryptography.io/en/latest/fernet/). Args: password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The Fernet system """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend(), ) key = urlsafe_b64encode(kdf.derive(password)) return Fernet(key) def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Encrypt data using Fernet system (symmetric encryption). Args: data: input data to encrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The encrypted data """ return _get_fernet(password, salt).encrypt(data) def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Decrypt data using Fernet system (symmetric encryption). Args: data: input data to decrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The decrypted data """ return _get_fernet(password, salt).decrypt(data) def privileged_user(request: HttpRequest, permissions: List[str] = []) -> bool: """Determine whether a user is authenticated and is a privileged one (e.g ambassador). This allows such user to have access to some more actions (e.g. bypass save code now review, access to 'archives' type...). A user is considered as privileged if he is a staff member or has any permission from those provided as parameters. Args: request: Input django HTTP request permissions: list of permission names to determine if user is privileged or not Returns: Whether the user is privileged or not. """ user = request.user return user.is_authenticated and ( user.is_staff or any([user.has_perm(perm) for perm in permissions]) ) diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index ecd3ff94..28dc14e7 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,187 +1,193 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union, cast from cryptography.fernet import InvalidToken from django.conf.urls import url from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser from swh.auth.django.utils import keycloak_oidc_client from swh.auth.django.views import get_oidc_login_data, oidc_login_view from swh.auth.django.views import urlpatterns as auth_urlpatterns from swh.auth.keycloak import KeycloakError, keycloak_error_message from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import decrypt_data, encrypt_data from swh.web.common.exc import ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config +from .mailmap import urlpatterns as mailmap_urlpatterns + def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) login_data = get_oidc_login_data(request) oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes: # token has been retrieved from a PosgreSQL database if isinstance(token, memoryview): return token.tobytes() else: return token @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) refresh_token = decrypted_token.decode("ascii") # check token is still valid oidc_client = keycloak_oidc_client() oidc_client.refresh_token(refresh_token) return HttpResponse(refresh_token, content_type="text/plain") except InvalidToken: return HttpResponse(status=401) except KeycloakError as ke: error_msg = keycloak_error_message(ke) if error_msg in ( "invalid_grant: Offline session not active", "invalid_grant: Offline user session not found", ): error_msg = "Bearer token has expired, please generate a new one." return HttpResponseBadRequest(error_msg, content_type="text/plain") @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") -urlpatterns = auth_urlpatterns + [ - url( - r"^oidc/generate-bearer-token/$", - oidc_generate_bearer_token, - name="oidc-generate-bearer-token", - ), - url( - r"^oidc/generate-bearer-token-complete/$", - oidc_generate_bearer_token_complete, - name="oidc-generate-bearer-token-complete", - ), - url( - r"^oidc/list-bearer-token/$", - oidc_list_bearer_tokens, - name="oidc-list-bearer-tokens", - ), - url( - r"^oidc/get-bearer-token/$", - oidc_get_bearer_token, - name="oidc-get-bearer-token", - ), - url( - r"^oidc/revoke-bearer-tokens/$", - oidc_revoke_bearer_tokens, - name="oidc-revoke-bearer-tokens", - ), - url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), -] +urlpatterns = ( + auth_urlpatterns + + [ + url( + r"^oidc/generate-bearer-token/$", + oidc_generate_bearer_token, + name="oidc-generate-bearer-token", + ), + url( + r"^oidc/generate-bearer-token-complete/$", + oidc_generate_bearer_token_complete, + name="oidc-generate-bearer-token-complete", + ), + url( + r"^oidc/list-bearer-token/$", + oidc_list_bearer_tokens, + name="oidc-list-bearer-tokens", + ), + url( + r"^oidc/get-bearer-token/$", + oidc_get_bearer_token, + name="oidc-get-bearer-token", + ), + url( + r"^oidc/revoke-bearer-tokens/$", + oidc_revoke_bearer_tokens, + name="oidc-revoke-bearer-tokens", + ), + url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), + ] + + mailmap_urlpatterns +) diff --git a/swh/web/tests/auth/test_mailmap.py b/swh/web/tests/auth/test_mailmap.py new file mode 100644 index 00000000..6308f564 --- /dev/null +++ b/swh/web/tests/auth/test_mailmap.py @@ -0,0 +1,45 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.web.auth.utils import MAILMAP_PERMISSION +from swh.web.common.utils import reverse +from swh.web.tests.utils import check_api_post_response, create_django_permission + + +@pytest.fixture +def mailmap_user(regular_user): + regular_user.user_permissions.add(create_django_permission(MAILMAP_PERMISSION)) + return regular_user + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize("view_name", ["profile-mailmap-add", "profile-mailmap-update"]) +def test_mailmap_endpoints_anonymous_user(api_client, view_name): + url = reverse(view_name) + check_api_post_response(api_client, url, status_code=403) + + +@pytest.mark.django_db(transaction=True) +def test_mailmap_endpoints_user_with_permission(api_client, mailmap_user): + api_client.force_login(mailmap_user) + for view_name in ("profile-mailmap-add", "profile-mailmap-update"): + url = reverse(view_name) + check_api_post_response( + api_client, + url, + data={"from_email": "bar@example.org", "display_name": "bar"}, + status_code=200, + ) + + +@pytest.mark.django_db(transaction=True) +def test_mailmap_endpoints_error_response(api_client, mailmap_user): + api_client.force_login(mailmap_user) + for view_name in ("profile-mailmap-add", "profile-mailmap-update"): + url = reverse(view_name) + resp = check_api_post_response(api_client, url, status_code=500) + assert "exception" in resp.data