diff --git a/swh/web/add_forge_now/views.py b/swh/web/add_forge_now/views.py index ce6883c3..c17fdb06 100644 --- a/swh/web/add_forge_now/views.py +++ b/swh/web/add_forge_now/views.py @@ -1,160 +1,160 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List from django.conf import settings -from django.conf.urls import url from django.contrib.auth.decorators import user_passes_test from django.core.paginator import Paginator from django.db.models import Q from django.http.request import HttpRequest from django.http.response import HttpResponse, JsonResponse from django.shortcuts import render +from django.urls import re_path as url from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestHistory from swh.web.api.views.add_forge_now import ( AddForgeNowRequestPublicSerializer, AddForgeNowRequestSerializer, ) from swh.web.auth.utils import is_add_forge_now_moderator def add_forge_request_list_datatables(request: HttpRequest) -> HttpResponse: """Dedicated endpoint used by datatables to display the add-forge requests in the Web UI. """ draw = int(request.GET.get("draw", 0)) add_forge_requests = AddForgeRequest.objects.all() table_data: Dict[str, Any] = { "recordsTotal": add_forge_requests.count(), "draw": draw, } search_value = request.GET.get("search[value]") column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "id") order_dir = request.GET.get("order[0][dir]", "desc") if field_order: if order_dir == "desc": field_order = "-" + field_order add_forge_requests = add_forge_requests.order_by(field_order) per_page = int(request.GET.get("length", 10)) page_num = int(request.GET.get("start", 0)) // per_page + 1 if search_value: add_forge_requests = add_forge_requests.filter( Q(forge_type__icontains=search_value) | Q(forge_url__icontains=search_value) | Q(status__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): add_forge_requests = add_forge_requests.filter( submitter_name=request.user.username ) paginator = Paginator(add_forge_requests, per_page) page = paginator.page(page_num) if is_add_forge_now_moderator(request.user): requests = AddForgeNowRequestSerializer(page.object_list, many=True).data else: requests = AddForgeNowRequestPublicSerializer(page.object_list, many=True).data results = [dict(req) for req in requests] table_data["recordsFiltered"] = add_forge_requests.count() table_data["data"] = results return JsonResponse(table_data) FORGE_TYPES: List[str] = [ "bitbucket", "cgit", "gitlab", "gitea", "heptapod", ] def create_request_create(request): """View to create a new 'add_forge_now' request.""" return render( request, "add_forge_now/creation_form.html", {"forge_types": FORGE_TYPES}, ) def create_request_list(request): """View to list existing 'add_forge_now' requests.""" return render( request, "add_forge_now/list.html", ) def create_request_help(request): """View to explain 'add_forge_now'.""" return render( request, "add_forge_now/help.html", ) @user_passes_test( is_add_forge_now_moderator, redirect_field_name="next_path", login_url=settings.LOGIN_URL, ) def create_request_message_source(request: HttpRequest, id: int) -> HttpResponse: """View to retrieve the message source for a given request history entry""" try: history_entry = RequestHistory.objects.select_related("request").get( pk=id, message_source__isnull=False ) assert history_entry.message_source is not None except RequestHistory.DoesNotExist: return HttpResponse(status=404) response = HttpResponse( bytes(history_entry.message_source), content_type="text/email" ) filename = f"add-forge-now-{history_entry.request.forge_domain}-message{id}.eml" response["Content-Disposition"] = f'attachment; filename="{filename}"' return response urlpatterns = [ url( r"^add-forge/request/list/datatables/$", add_forge_request_list_datatables, name="add-forge-request-list-datatables", ), url(r"^add-forge/request/create/$", create_request_create, name="forge-add-create"), url(r"^add-forge/request/list/$", create_request_list, name="forge-add-list"), url( r"^add-forge/request/message-source/(?P\d+)/$", create_request_message_source, name="forge-add-message-source", ), url(r"^add-forge/request/help/$", create_request_help, name="forge-add-help"), ] diff --git a/swh/web/admin/urls.py b/swh/web/admin/urls.py index 27014677..4b06b3b2 100644 --- a/swh/web/admin/urls.py +++ b/swh/web/admin/urls.py @@ -1,29 +1,29 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url from django.contrib.auth.views import LoginView from django.shortcuts import redirect +from django.urls import re_path as url from swh.web.admin.adminurls import AdminUrls import swh.web.admin.deposit # noqa import swh.web.admin.mailmap # noqa import swh.web.admin.origin_save # noqa from swh.web.config import is_feature_enabled if is_feature_enabled("add_forge_now"): import swh.web.admin.add_forge_now # noqa def _admin_default_view(request): return redirect("admin-origin-save-requests") urlpatterns = [ url(r"^$", _admin_default_view, name="admin"), url(r"^login/$", LoginView.as_view(template_name="login.html"), name="login"), ] urlpatterns += AdminUrls.get_url_patterns() diff --git a/swh/web/auth/mailmap.py b/swh/web/auth/mailmap.py index 3108f062..b3f14a3c 100644 --- a/swh/web/auth/mailmap.py +++ b/swh/web/auth/mailmap.py @@ -1,204 +1,204 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict -from django.conf.urls import url from django.core.paginator import Paginator from django.db import IntegrityError from django.db.models import Q from django.http.request import HttpRequest from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseNotFound, JsonResponse, ) +from django.urls import re_path as url from rest_framework import serializers from rest_framework.decorators import api_view from rest_framework.request import Request from rest_framework.response import Response from swh.web.auth.models import UserMailmap, UserMailmapEvent from swh.web.auth.utils import ( MAILMAP_ADMIN_PERMISSION, MAILMAP_PERMISSION, any_permission_required, ) class UserMailmapSerializer(serializers.ModelSerializer): class Meta: model = UserMailmap fields = "__all__" @api_view(["GET"]) @any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION) def profile_list_mailmap(request: Request) -> HttpResponse: mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION) mms = UserMailmap.objects.filter( user_id=None if mailmap_admin else str(request.user.id) ).all() return Response(UserMailmapSerializer(mms, many=True).data) @api_view(["POST"]) @any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION) def profile_add_mailmap(request: Request) -> HttpResponse: mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION) event = UserMailmapEvent.objects.create( user_id=str(request.user.id), request_type="add", request=json.dumps(request.data), ) from_email = request.data.pop("from_email", None) if not from_email: return HttpResponseBadRequest( "'from_email' must be provided and non-empty.", content_type="text/plain" ) user_id = None if mailmap_admin else str(request.user.id) from_email_verified = request.data.pop("from_email_verified", False) if mailmap_admin: # consider email verified when mailmap is added by admin from_email_verified = True try: UserMailmap.objects.create( user_id=user_id, from_email=from_email, from_email_verified=from_email_verified, **request.data, ) except IntegrityError as e: if ( "user_mailmap_from_email_key" in e.args[0] or "user_mailmap.from_email" in e.args[0] ): return HttpResponseBadRequest( "This 'from_email' already exists.", content_type="text/plain" ) else: raise event.successful = True event.save() mm = UserMailmap.objects.get(user_id=user_id, from_email=from_email) return Response(UserMailmapSerializer(mm).data) @api_view(["POST"]) @any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION) def profile_update_mailmap(request: Request) -> HttpResponse: mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION) event = UserMailmapEvent.objects.create( user_id=str(request.user.id), request_type="update", request=json.dumps(request.data), ) from_email = request.data.pop("from_email", None) if not from_email: return HttpResponseBadRequest( "'from_email' must be provided and non-empty.", content_type="text/plain" ) user_id = None if mailmap_admin else str(request.user.id) try: to_update = ( UserMailmap.objects.filter(user_id=user_id) .filter(from_email=from_email) .get() ) except UserMailmap.DoesNotExist: return HttpResponseNotFound("'from_email' cannot be found in mailmaps.") for attr, value in request.data.items(): setattr(to_update, attr, value) to_update.save() event.successful = True event.save() mm = UserMailmap.objects.get(user_id=user_id, from_email=from_email) return Response(UserMailmapSerializer(mm).data) @any_permission_required(MAILMAP_PERMISSION, MAILMAP_ADMIN_PERMISSION) def profile_list_mailmap_datatables(request: HttpRequest) -> HttpResponse: mailmap_admin = request.user.has_perm(MAILMAP_ADMIN_PERMISSION) mailmaps = UserMailmap.objects.filter( user_id=None if mailmap_admin else str(request.user.id) ) search_value = request.GET.get("search[value]", "") column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "from_email") order_dir = request.GET.get("order[0][dir]", "asc") if order_dir == "desc": field_order = "-" + field_order mailmaps = mailmaps.order_by(field_order) table_data: Dict[str, Any] = {} table_data["draw"] = int(request.GET.get("draw", 1)) table_data["recordsTotal"] = mailmaps.count() length = int(request.GET.get("length", 10)) page = int(request.GET.get("start", 0)) / length + 1 if search_value: mailmaps = mailmaps.filter( Q(from_email__icontains=search_value) | Q(display_name__icontains=search_value) ) table_data["recordsFiltered"] = mailmaps.count() paginator = Paginator(mailmaps, length) mailmaps_data = [ UserMailmapSerializer(mm).data for mm in paginator.page(int(page)).object_list ] table_data["data"] = mailmaps_data return JsonResponse(table_data) urlpatterns = [ url( r"^profile/mailmap/list/$", profile_list_mailmap, name="profile-mailmap-list", ), url( r"^profile/mailmap/add/$", profile_add_mailmap, name="profile-mailmap-add", ), url( r"^profile/mailmap/update/$", profile_update_mailmap, name="profile-mailmap-update", ), url( r"^profile/mailmap/list/datatables/$", profile_list_mailmap_datatables, name="profile-mailmap-list-datatables", ), ] diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 2cf06021..bc2a2461 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,197 +1,197 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union, cast from cryptography.fernet import InvalidToken -from django.conf.urls import url from django.contrib.auth.decorators import login_required from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import render +from django.urls import re_path as url from django.views.decorators.http import require_http_methods from swh.auth.django.models import OIDCUser from swh.auth.django.utils import keycloak_oidc_client from swh.auth.django.views import get_oidc_login_data, oidc_login_view from swh.auth.django.views import urlpatterns as auth_urlpatterns from swh.auth.keycloak import KeycloakError, keycloak_error_message from swh.web.auth.models import OIDCUserOfflineTokens from swh.web.auth.utils import decrypt_data, encrypt_data from swh.web.common.exc import ForbiddenExc from swh.web.common.utils import reverse from swh.web.config import get_config from .mailmap import urlpatterns as mailmap_urlpatterns def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() redirect_uri = reverse("oidc-generate-bearer-token-complete", request=request) return oidc_login_view( request, redirect_uri=redirect_uri, scope="openid offline_access" ) def oidc_generate_bearer_token_complete(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): raise ForbiddenExc("You are not allowed to generate bearer tokens.") if "error" in request.GET: raise Exception(request.GET["error"]) login_data = get_oidc_login_data(request) oidc_client = keycloak_oidc_client() oidc_profile = oidc_client.authorization_code( code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) user = cast(OIDCUser, request.user) token = oidc_profile["refresh_token"] secret = get_config()["secret_key"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), secret, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponseRedirect(reverse("oidc-profile") + "#tokens") def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) def _encrypted_token_bytes(token: Union[bytes, memoryview]) -> bytes: # token has been retrieved from a PosgreSQL database if isinstance(token, memoryview): return token.tobytes() else: return token @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) refresh_token = decrypted_token.decode("ascii") # check token is still valid oidc_client = keycloak_oidc_client() oidc_client.refresh_token(refresh_token) return HttpResponse(refresh_token, content_type="text/plain") except InvalidToken: return HttpResponse(status=401) except KeycloakError as ke: error_msg = keycloak_error_message(ke) if error_msg in ( "invalid_grant: Offline session not active", "invalid_grant: Offline user session not found", ): error_msg = "Bearer token has expired, please generate a new one." return HttpResponseBadRequest(error_msg, content_type="text/plain") @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("ascii")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) secret = get_config()["secret_key"].encode() salt = user.sub.encode() decrypted_token = decrypt_data( _encrypted_token_bytes(token_data.offline_token), secret, salt ) oidc_client = keycloak_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) @login_required(login_url="/oidc/login/", redirect_field_name="next_path") def _oidc_profile_view(request: HttpRequest) -> HttpResponse: return render(request, "auth/profile.html") urlpatterns = ( auth_urlpatterns + [ url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/generate-bearer-token-complete/$", oidc_generate_bearer_token_complete, name="oidc-generate-bearer-token-complete", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), url( r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile", ), ] + mailmap_urlpatterns ) diff --git a/swh/web/browse/urls.py b/swh/web/browse/urls.py index 92efd002..bbad7d1e 100644 --- a/swh/web/browse/urls.py +++ b/swh/web/browse/urls.py @@ -1,64 +1,64 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url from django.http import HttpRequest, HttpResponse from django.shortcuts import redirect, render +from django.urls import re_path as url from swh.web.browse.browseurls import BrowseUrls from swh.web.browse.identifiers import swhid_browse import swh.web.browse.views.content # noqa import swh.web.browse.views.directory # noqa import swh.web.browse.views.origin # noqa import swh.web.browse.views.release # noqa import swh.web.browse.views.revision # noqa import swh.web.browse.views.snapshot # noqa from swh.web.common.utils import origin_visit_types, reverse def _browse_help_view(request: HttpRequest) -> HttpResponse: return render( request, "browse/help.html", {"heading": "How to browse the archive ?"} ) def _browse_search_view(request: HttpRequest) -> HttpResponse: return render( request, "browse/search.html", { "heading": "Search software origins to browse", "visit_types": origin_visit_types(), }, ) def _browse_vault_view(request: HttpRequest) -> HttpResponse: return render( request, "browse/vault-ui.html", {"heading": "Download archive content from the Vault"}, ) def _browse_origin_save_view(request: HttpRequest) -> HttpResponse: return redirect(reverse("origin-save")) urlpatterns = [ url(r"^$", _browse_search_view), url(r"^help/$", _browse_help_view, name="browse-help"), url(r"^search/$", _browse_search_view, name="browse-search"), url(r"^vault/$", _browse_vault_view, name="browse-vault"), # for backward compatibility url(r"^origin/save/$", _browse_origin_save_view, name="browse-origin-save"), url( r"^(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$", swhid_browse, name="browse-swhid", ), ] urlpatterns += BrowseUrls.get_url_patterns() diff --git a/swh/web/common/urlsindex.py b/swh/web/common/urlsindex.py index 62bbe926..1469f54b 100644 --- a/swh/web/common/urlsindex.py +++ b/swh/web/common/urlsindex.py @@ -1,75 +1,75 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List -from django.conf.urls import url from django.shortcuts import redirect -import django.urls +from django.urls import URLPattern +from django.urls import re_path as url class UrlsIndex(object): """ Simple helper class for centralizing url patterns of a Django web application. Derived classes should override the 'scope' class attribute otherwise all declared patterns will be grouped under the default one. """ - _urlpatterns = {} # type: Dict[str, List[django.urls.URLPattern]] + _urlpatterns: Dict[str, List[URLPattern]] = {} scope = "default" @classmethod def add_url_pattern(cls, url_pattern, view, view_name=None): """ Class method that adds an url pattern to the current scope. Args: url_pattern: regex describing a Django url view: function implementing the Django view view_name: name of the view used to reverse the url """ if cls.scope not in cls._urlpatterns: cls._urlpatterns[cls.scope] = [] if view_name: cls._urlpatterns[cls.scope].append(url(url_pattern, view, name=view_name)) else: cls._urlpatterns[cls.scope].append(url(url_pattern, view)) @classmethod def add_redirect_for_checksum_args(cls, view_name, url_patterns, checksum_args): """ Class method that redirects to view with lowercase checksums when upper/mixed case checksums are passed as url arguments. Args: view_name (str): name of the view to redirect requests url_patterns (List[str]): regexps describing the view urls checksum_args (List[str]): url argument names corresponding to checksum values """ new_view_name = view_name + "-uppercase-checksum" for url_pattern in url_patterns: url_pattern_upper = url_pattern.replace("[0-9a-f]", "[0-9a-fA-F]") def view_redirect(request, *args, **kwargs): for checksum_arg in checksum_args: checksum_upper = kwargs[checksum_arg] kwargs[checksum_arg] = checksum_upper.lower() return redirect(view_name, *args, **kwargs) cls.add_url_pattern(url_pattern_upper, view_redirect, new_view_name) @classmethod def get_url_patterns(cls): """ Class method that returns the list of url pattern associated to the current scope. Returns: The list of url patterns associated to the current scope """ return cls._urlpatterns[cls.scope] diff --git a/swh/web/inbound_email/__init__.py b/swh/web/inbound_email/__init__.py index e69de29b..784d2587 100644 --- a/swh/web/inbound_email/__init__.py +++ b/swh/web/inbound_email/__init__.py @@ -0,0 +1,6 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +default_app_config = "swh.web.inbound_email.apps.InboundEmailConfig" diff --git a/swh/web/inbound_email/apps.py b/swh/web/inbound_email/apps.py index a022295c..3907162f 100644 --- a/swh/web/inbound_email/apps.py +++ b/swh/web/inbound_email/apps.py @@ -1,11 +1,11 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.apps import AppConfig class InboundEmailConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" - name = "inbound_email" + name = "swh.web.inbound_email" diff --git a/swh/web/inbound_email/signals.py b/swh/web/inbound_email/signals.py index ffe3bb80..ac673a4d 100644 --- a/swh/web/inbound_email/signals.py +++ b/swh/web/inbound_email/signals.py @@ -1,36 +1,36 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from enum import Enum, auto import django.dispatch -email_received = django.dispatch.Signal(providing_args=["message"]) +email_received = django.dispatch.Signal() """This signal is sent by the `process_inbound_email` management command. Arguments: message (:class:`email.message.EmailMessage`): the inbound email message Signal receivers must return an :class:`EmailProcessingStatus` value so that the management command knows if the email has been processed. Signal receivers will be called for all received emails and are expected to do their own filtering (e.g. using the original destination address). Receivers ignoring a message must return `EmailProcessingStatus.IGNORED` to let the management command know that the message hasn't been processed. """ class EmailProcessingStatus(Enum): """Return values for the email processing signal listeners""" PROCESSED = auto() """The email has been successfully processed""" FAILED = auto() """The email has been processed, but the processing failed""" IGNORED = auto() """The email has been ignored (e.g. unknown recipient)""" diff --git a/swh/web/misc/badges.py b/swh/web/misc/badges.py index 7fa846af..22dfe943 100644 --- a/swh/web/misc/badges.py +++ b/swh/web/misc/badges.py @@ -1,186 +1,186 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import b64encode from typing import Optional, cast from pybadges import badge -from django.conf.urls import url from django.contrib.staticfiles import finders from django.http import HttpRequest, HttpResponse +from django.urls import re_path as url from swh.model.exceptions import ValidationError from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID from swh.web.common import archive from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.identifiers import parse_object_type, resolve_swhid from swh.web.common.utils import reverse _orange = "#f36a24" _blue = "#0172b2" _red = "#cd5741" _swh_logo_data = None _badge_config = { "content": { "color": _blue, "title": "Archived source file", }, "directory": { "color": _blue, "title": "Archived source tree", }, "origin": { "color": _orange, "title": "Archived software repository", }, "release": { "color": _blue, "title": "Archived software release", }, "revision": { "color": _blue, "title": "Archived commit", }, "snapshot": { "color": _blue, "title": "Archived software repository snapshot", }, "error": {"color": _red, "title": "An error occurred when generating the badge"}, } def _get_logo_data() -> str: """ Get data-URI for Software Heritage SVG logo to embed it in the generated badges. """ global _swh_logo_data if _swh_logo_data is None: swh_logo_path = cast(str, finders.find("img/swh-logo-white.svg")) with open(swh_logo_path, "rb") as swh_logo_file: _swh_logo_data = "data:image/svg+xml;base64,%s" % b64encode( swh_logo_file.read() ).decode("ascii") return _swh_logo_data def _swh_badge( request: HttpRequest, object_type: str, object_id: str, object_swhid: Optional[str] = "", ) -> HttpResponse: """ Generate a Software Heritage badge for a given object type and id. Args: request: input http request object_type: The type of swh object to generate a badge for, either *content*, *directory*, *revision*, *release*, *origin* or *snapshot* object_id: The id of the swh object, either an url for origin type or a *sha1* for other object types object_swhid: If provided, the object SWHID will not be recomputed Returns: HTTP response with content type *image/svg+xml* containing the SVG badge data. If the provided parameters are invalid, HTTP 400 status code will be returned. If the object can not be found in the archive, HTTP 404 status code will be returned. """ left_text = "error" whole_link = None try: if object_type == "origin": archive.lookup_origin({"url": object_id}) right_text = "repository" whole_link = reverse( "browse-origin", query_params={"origin_url": object_id} ) else: # when SWHID is provided, object type and id will be parsed # from it if object_swhid: parsed_swhid = QualifiedSWHID.from_string(object_swhid) parsed_object_type = parsed_swhid.object_type object_id = hash_to_hex(parsed_swhid.object_id) swh_object = archive.lookup_object(parsed_swhid.object_type, object_id) # remove SWHID qualified if any for badge text right_text = str( CoreSWHID( object_type=parsed_swhid.object_type, object_id=parsed_swhid.object_id, ) ) object_type = parsed_swhid.object_type.name.lower() else: parsed_object_type = parse_object_type(object_type) right_text = str( CoreSWHID( object_type=parsed_object_type, object_id=hash_to_bytes(object_id), ) ) swh_object = archive.lookup_object(parsed_object_type, object_id) whole_link = resolve_swhid(str(right_text))["browse_url"] # use release name for badge text if parsed_object_type == ObjectType.RELEASE: right_text = "release %s" % swh_object["name"] left_text = "archived" except (BadInputExc, ValidationError): right_text = f'invalid {object_type if object_type else "object"} id' object_type = "error" except NotFoundExc: right_text = f'{object_type if object_type else "object"} not found' object_type = "error" badge_data = badge( left_text=left_text, right_text=right_text, right_color=_badge_config[object_type]["color"], whole_link=request.build_absolute_uri(whole_link), whole_title=_badge_config[object_type]["title"], logo=_get_logo_data(), embed_logo=True, ) return HttpResponse(badge_data, content_type="image/svg+xml") def _swh_badge_swhid(request: HttpRequest, object_swhid: str) -> HttpResponse: """ Generate a Software Heritage badge for a given object SWHID. Args: request (django.http.HttpRequest): input http request object_swhid (str): a SWHID of an archived object Returns: django.http.HttpResponse: An http response with content type *image/svg+xml* containing the SVG badge data. If any error occurs, a status code of 400 will be returned. """ return _swh_badge(request, "", "", object_swhid) urlpatterns = [ url( r"^badge/(?P[a-z]+)/(?P.+)/$", _swh_badge, name="swh-badge", ), url( r"^badge/(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$", _swh_badge_swhid, name="swh-badge-swhid", ), ] diff --git a/swh/web/misc/coverage.py b/swh/web/misc/coverage.py index d13c41cd..82c92179 100644 --- a/swh/web/misc/coverage.py +++ b/swh/web/misc/coverage.py @@ -1,501 +1,501 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter, defaultdict from typing import Any, Dict, List, Tuple from urllib.parse import urlparse -from django.conf.urls import url from django.http.request import HttpRequest from django.http.response import HttpResponse from django.shortcuts import render +from django.urls import re_path as url from django.views.decorators.cache import never_cache from django.views.decorators.clickjacking import xframe_options_exempt from swh.scheduler.model import SchedulerMetrics from swh.web.common import archive from swh.web.common.utils import ( django_cache, get_deposits_list, is_swh_web_development, is_swh_web_production, reverse, ) from swh.web.config import scheduler _swh_arch_overview_doc = ( "https://docs.softwareheritage.org/devel/architecture/overview.html" ) # Current coverage list of the archive in a high level overview fashion, # categorized as follow: # - listed origins: origins discovered using a swh lister # - legacy: origins where public hosting service has closed # - deposited: origins coming from swh-deposit # # TODO: Store that list in a database table somewhere (swh-scheduler, swh-storage ?) # and retrieve it dynamically listed_origins: Dict[str, Any] = { "info": ( "These software origins get continuously discovered and archived using " f'the listers implemented by Software Heritage.' ), "origins": [ { "type": "bitbucket", "info_url": "https://bitbucket.org", "info": "public repositories from Bitbucket", "search_pattern": { "default": "https://bitbucket.org/", }, }, { "type": "cgit", "info_url": "https://git.zx2c4.com/cgit/about", "info": "public repositories from cgit instances", "search_pattern": { "default": "cgit", }, }, { "type": "CRAN", "info_url": "https://cran.r-project.org", "info": "source packages from The Comprehensive R Archive Network", "search_pattern": { "default": "https://cran.r-project.org/", }, }, { "type": "debian", "info_url": "https://www.debian.org", "info": "source packages from Debian and Debian-based distributions", "search_pattern": { "default": "deb://", }, }, { "type": "gitea", "info_url": "https://gitea.io", "info": "public repositories from Gitea instances", "search_pattern": { "default": "gitea", }, }, { "type": "github", "info_url": "https://github.com", "info": "public repositories from GitHub", "search_pattern": { "default": "https://github.com/", }, }, { "type": "gitlab", "info_url": "https://gitlab.com", "info": "public repositories from multiple GitLab instances", "search_pattern": { "default": "gitlab", }, }, { "type": "guix", "info_url": "https://guix.gnu.org", "info": "source code tarballs used to build the Guix package collection", "visit_types": ["nixguix"], "search_pattern": { "default": "https://guix.gnu.org/sources.json", }, }, { "type": "GNU", "info_url": "https://www.gnu.org", "info": "releases from the GNU project (as of August 2015)", "search_pattern": { "default": "gnu", }, }, { "type": "heptapod", "info_url": "https://heptapod.net/", "info": "public repositories from multiple Heptapod instances", "search_pattern": { "default": "heptapod", }, }, { "type": "launchpad", "info_url": "https://launchpad.net", "logo": "img/logos/launchpad.png", "info": "public repositories from Launchpad", "search_pattern": { "default": "launchpad.net/", }, }, { "type": "maven", "info_url": "https://maven.apache.org/", "info": "java source packages from maven repositories", "search_pattern": { "default": "maven", "cvs": "", "git": "", "hg": "", "svn": "", }, }, { "type": "nixos", "info_url": "https://nixos.org", "info": "source code tarballs used to build the Nix package collection", "visit_types": ["nixguix"], "search_pattern": { "default": ( "https://nix-community.github.io/nixpkgs-swh/sources-unstable.json" ) }, }, { "type": "npm", "info_url": "https://www.npmjs.com", "info": "public packages from the package registry for javascript", "search_pattern": { "default": "https://www.npmjs.com", }, }, { "type": "opam", "info_url": "https://opam.ocaml.org/", "info": "public packages from the source-based package manager for OCaml", "search_pattern": { "default": "opam+https://", }, }, { "type": "Packagist", "info_url": "https://packagist.org/", "info": "source code repositories referenced by The PHP Package Repository", "search_pattern": { "default": "", }, }, { "type": "phabricator", "info_url": "https://www.phacility.com/phabricator", "info": "public repositories from multiple Phabricator instances", "search_pattern": { "default": "phabricator", }, }, { "type": "pypi", "info_url": "https://pypi.org", "info": "source packages from the Python Package Index", "search_pattern": { "default": "https://pypi.org", }, }, { "type": "sourceforge", "info_url": "https://sourceforge.net", "info": "public repositories from SourceForge", "search_pattern": { "default": "code.sf.net", "bzr": "bzr.sourceforge.net", "cvs": "cvs.sourceforge.net", }, }, ], } legacy_origins: Dict[str, Any] = { "info": ( "Discontinued hosting services. Those origins have been archived " "by Software Heritage." ), "origins": [ { "type": "gitorious", "info_url": "https://en.wikipedia.org/wiki/Gitorious", "info": ( "public repositories from the former Gitorious code hosting service" ), "visit_types": ["git"], "search_pattern": "https://gitorious.org", "count": "122,014", }, { "type": "googlecode", "info_url": "https://code.google.com/archive", "info": ( "public repositories from the former Google Code project " "hosting service" ), "visit_types": ["git", "hg", "svn"], "search_pattern": "googlecode.com", "count": "790,026", }, { "type": "bitbucket", "info_url": "https://bitbucket.org", "info": "public repositories from Bitbucket", "search_pattern": "https://bitbucket.org/", "visit_types": ["hg"], "count": "336,795", }, ], } deposited_origins: Dict[str, Any] = { "info": ( "These origins are directly pushed into the archive by trusted partners " f'using the deposit service of Software Heritage.' ), "origins": [ { "type": "elife", "info_url": "https://elifesciences.org", "info": ( "research software source code associated to the articles " "eLife publishes" ), "search_pattern": "elife.stencila.io", "visit_types": ["deposit"], }, { "type": "hal", "info_url": "https://hal.archives-ouvertes.fr", "info": "scientific software source code deposited in the open archive HAL", "visit_types": ["deposit"], "search_pattern": "hal.archives-ouvertes.fr", }, { "type": "ipol", "info_url": "https://www.ipol.im", "info": "software artifacts associated to the articles IPOL publishes", "visit_types": ["deposit"], "search_pattern": "doi.org/10.5201", }, ], } _cache_timeout = 60 * 60 # one hour def _get_listers_metrics( cache_metrics: bool = False, ) -> Dict[str, List[Tuple[str, SchedulerMetrics]]]: """Returns scheduler metrics in the following mapping: Dict[lister_name, List[Tuple[instance_name, SchedulerMetrics]]] as a lister instance has one SchedulerMetrics object per visit type. """ @django_cache( timeout=_cache_timeout, catch_exception=True, exception_return_value={}, invalidate_cache_pred=lambda m: not cache_metrics, ) def _get_listers_metrics_internal(): listers_metrics = defaultdict(list) listers = scheduler().get_listers() scheduler_metrics = scheduler().get_metrics() for lister in listers: for metrics in filter( lambda m: m.lister_id == lister.id, scheduler_metrics ): listers_metrics[lister.name].append((lister.instance_name, metrics)) return listers_metrics return _get_listers_metrics_internal() def _get_deposits_netloc_counts(cache_counts: bool = False) -> Counter: """Return deposit counts per origin url network location.""" def _process_origin_url(origin_url): parsed_url = urlparse(origin_url) netloc = parsed_url.netloc # special treatment for doi.org netloc as it is not specific enough # for origins mapping if parsed_url.netloc == "doi.org": netloc += "/" + parsed_url.path.split("/")[1] return netloc @django_cache( timeout=_cache_timeout, catch_exception=True, exception_return_value=Counter(), invalidate_cache_pred=lambda m: not cache_counts, ) def _get_deposits_netloc_counts_internal(): netlocs = [] deposits = get_deposits_list() netlocs = [ _process_origin_url(d["origin_url"]) for d in deposits if d["status"] == "done" ] deposits_netloc_counts = Counter(netlocs) return deposits_netloc_counts return _get_deposits_netloc_counts_internal() def _get_nixguix_origins_count(origin_url: str, cache_count: bool = False) -> int: """Returns number of archived tarballs for NixOS, aka the number of branches in a dedicated origin in the archive. """ @django_cache( timeout=_cache_timeout, catch_exception=True, exception_return_value=0, invalidate_cache_pred=lambda m: not cache_count, ) def _get_nixguix_origins_count_internal(): snapshot = archive.lookup_latest_origin_snapshot(origin_url) if snapshot: snapshot_sizes = archive.lookup_snapshot_sizes(snapshot["id"]) nixguix_origins_count = snapshot_sizes["release"] else: nixguix_origins_count = 0 return nixguix_origins_count return _get_nixguix_origins_count_internal() def _search_url(query: str, visit_type: str) -> str: return reverse( "browse-search", query_params={ "q": query, "visit_type": visit_type, "with_visit": "true", "with_content": "true", }, ) @xframe_options_exempt @never_cache def _swh_coverage(request: HttpRequest) -> HttpResponse: use_cache = is_swh_web_production(request) listers_metrics = _get_listers_metrics(use_cache) for origins in listed_origins["origins"]: origins["count"] = "0" origins["instances"] = {} origins_type = origins["type"] # special processing for nixos/guix origins as there is no # scheduler metrics for those if origins_type in ("nixos", "guix"): count = _get_nixguix_origins_count( origins["search_pattern"]["default"], use_cache ) origins["count"] = f"{count:,}" origins["instances"][origins_type] = {"nixguix": {"count": count}} if origins_type not in listers_metrics: continue count_total = sum( [metrics.origins_enabled for _, metrics in listers_metrics[origins_type]] ) count_never_visited = sum( [ metrics.origins_never_visited for _, metrics in listers_metrics[origins_type] ] ) count = count_total - count_never_visited origins["count"] = f"{count:,}" origins["instances"] = defaultdict(dict) for instance, metrics in listers_metrics[origins_type]: instance_count = metrics.origins_enabled - metrics.origins_never_visited # no archived origins for that visit type, skip it if instance_count == 0: continue origins["instances"][instance].update( {metrics.visit_type: {"count": f"{instance_count:,}"}} ) origins["visit_types"] = list( set(origins["instances"][instance].keys()) | set(origins.get("visit_types", [])) ) if origins_type == "CRAN": origins["instances"]["cran"]["cran"] = {"count": origins["count"]} # defaultdict cannot be iterated in django template origins["instances"] = dict(origins["instances"]) for origins in listed_origins["origins"]: instances = origins["instances"] nb_instances = len(instances) for instance_name, visit_types in instances.items(): for visit_type in visit_types: search_url = "" if visit_type in origins["search_pattern"]: search_pattern = origins["search_pattern"][visit_type] elif nb_instances > 1: search_pattern = instance_name else: search_pattern = origins["search_pattern"]["default"] if search_pattern: search_url = _search_url(search_pattern, visit_type) visit_types[visit_type]["search_url"] = search_url # filter out origin types without archived origins on production and staging if not is_swh_web_development(request): listed_origins["origins"] = list( filter(lambda o: o["count"] != "0", listed_origins["origins"]) ) for origins in legacy_origins["origins"]: origins["search_urls"] = {} for visit_type in origins["visit_types"]: origins["search_urls"][visit_type] = _search_url( origins["search_pattern"], visit_type ) deposits_counts = _get_deposits_netloc_counts(use_cache) for origins in deposited_origins["origins"]: origins["count"] = "0" if origins["search_pattern"] in deposits_counts: origins["count"] = f"{deposits_counts[origins['search_pattern']]:,}" origins["search_urls"] = { "deposit": _search_url(origins["search_pattern"], "deposit") } focus = [] focus_param = request.GET.get("focus") if focus_param: focus = focus_param.split(",") return render( request, "misc/coverage.html", { "origins": { "Regular crawling": listed_origins, "Discontinued hosting": legacy_origins, "On demand archival": deposited_origins, }, "focus": focus, }, ) urlpatterns = [ url(r"^coverage/$", _swh_coverage, name="swh-coverage"), ] diff --git a/swh/web/misc/fundraising.py b/swh/web/misc/fundraising.py index f35b0c3b..f745e194 100644 --- a/swh/web/misc/fundraising.py +++ b/swh/web/misc/fundraising.py @@ -1,62 +1,62 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import requests -from django.conf.urls import url from django.shortcuts import render +from django.urls import re_path as url from django.views.decorators.clickjacking import xframe_options_exempt from swh.web.config import get_config @xframe_options_exempt def fundraising_banner(request): config = get_config() public_key = config["give"]["public_key"] token = config["give"]["token"] give_api_forms_url = ( "https://www.softwareheritage.org/give-api/v1/forms/" f"?key={public_key}&token={token}&form=27047" ) donations_goal = 100 nb_donations = -1 try: fundraising_form = requests.get(give_api_forms_url).json().get("forms", []) if fundraising_form: nb_donations = int( fundraising_form[0] .get("stats", {}) .get("total", {}) .get("donations", -1) ) except Exception: pass goal_percent = int(nb_donations / donations_goal * 100) lang = request.GET.get("lang") return render( request, "misc/fundraising-banner.html", { "nb_donations": nb_donations, "donations_goal": donations_goal, "goal_percent": goal_percent, "lang": lang if lang else "en", "donation_form_url": ( "https://www.softwareheritage.org/donations/" "help-preserve-sourcecode-2021/" ), }, ) urlpatterns = [ url(r"^fundraising/banner/$", fundraising_banner, name="swh-fundraising-banner"), ] diff --git a/swh/web/misc/iframe.py b/swh/web/misc/iframe.py index 80e0758f..cc48c28e 100644 --- a/swh/web/misc/iframe.py +++ b/swh/web/misc/iframe.py @@ -1,340 +1,340 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple -from django.conf.urls import url from django.shortcuts import render +from django.urls import re_path as url from django.views.decorators.clickjacking import xframe_options_exempt from swh.model.hashutil import hash_to_bytes from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( content_display_max_size, get_directory_entries, prepare_content_for_display, request_content, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc, NotFoundExc, http_status_code_message from swh.web.common.identifiers import get_swhid, get_swhids_info from swh.web.common.typing import SnapshotContext, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse def _get_content_rendering_data(cnt_swhid: QualifiedSWHID, path: str) -> Dict[str, Any]: content_data = request_content(f"sha1_git:{cnt_swhid.object_id.hex()}") content = None language = None mimetype = None if content_data.get("raw_data") is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] return { "content": content, "content_size": content_data.get("length"), "max_content_size": content_display_max_size, "filename": path.split("/")[-1], "encoding": content_data.get("encoding"), "mimetype": mimetype, "language": language, } def _get_directory_rendering_data( dir_swhid: QualifiedSWHID, focus_swhid: QualifiedSWHID, path: str, ) -> Dict[str, Any]: dirs, files = get_directory_entries(dir_swhid.object_id.hex()) for d in dirs: if d["type"] == "rev": d["url"] = None else: dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(d["target"]), origin=dir_swhid.origin, visit=dir_swhid.visit, anchor=dir_swhid.anchor, path=(path or "/") + d["name"] + "/", ) d["url"] = reverse( "swhid-iframe", url_args={"swhid": str(dir_swhid)}, query_params={"focus_swhid": str(focus_swhid)}, ) for f in files: object_id = hash_to_bytes(f["target"]) cnt_swhid = QualifiedSWHID( object_type=ObjectType.CONTENT, object_id=object_id, origin=dir_swhid.origin, visit=dir_swhid.visit, anchor=dir_swhid.anchor, path=(path or "/") + f["name"], lines=(focus_swhid.lines if object_id == focus_swhid.object_id else None), ) f["url"] = reverse( "swhid-iframe", url_args={"swhid": str(cnt_swhid)}, query_params={"focus_swhid": str(focus_swhid)}, ) return {"dirs": dirs, "files": files} def _get_breacrumbs_data( swhid: QualifiedSWHID, focus_swhid: QualifiedSWHID, path: str, snapshot_context: Optional[SnapshotContext] = None, ) -> Tuple[List[Dict[str, Any]], Optional[str]]: breadcrumbs = [] filename = None # strip any leading or trailing slash from path qualifier of SWHID if path and path[0] == "/": path = path[1:] if path and path[-1] == "/": path = path[:-1] if swhid.object_type == ObjectType.CONTENT: split_path = path.split("/") filename = split_path[-1] path = path[: -len(filename)] path_info = gen_path_info(path) if path != "/" else [] root_dir = None if snapshot_context and snapshot_context["root_directory"]: root_dir = snapshot_context["root_directory"] elif swhid.anchor and swhid.anchor.object_type == ObjectType.DIRECTORY: root_dir = swhid.anchor.object_id.hex() elif focus_swhid.object_type == ObjectType.DIRECTORY: root_dir = focus_swhid.object_id.hex() if root_dir: root_dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(root_dir), origin=swhid.origin, visit=swhid.visit, anchor=swhid.anchor, ) breadcrumbs.append( { "name": root_dir[:7], "object_id": root_dir_swhid.object_id.hex(), "path": "/", "url": reverse( "swhid-iframe", url_args={"swhid": str(root_dir_swhid)}, query_params={ "focus_swhid": str(focus_swhid) if focus_swhid != root_dir_swhid else None }, ), } ) for pi in path_info: dir_info = archive.lookup_directory_with_path(root_dir, pi["path"]) dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(dir_info["target"]), origin=swhid.origin, visit=swhid.visit, anchor=swhid.anchor, path="/" + pi["path"] + "/", ) breadcrumbs.append( { "name": pi["name"], "object_id": dir_swhid.object_id.hex(), "path": dir_swhid.path.decode("utf-8") if dir_swhid.path else "", "url": reverse( "swhid-iframe", url_args={"swhid": str(dir_swhid)}, query_params={"focus_swhid": str(focus_swhid)}, ), } ) if filename: breadcrumbs.append( { "name": filename, "object_id": swhid.object_id.hex(), "path": path, "url": "", } ) return breadcrumbs, root_dir @xframe_options_exempt def swhid_iframe(request, swhid: str): """Django view that can be embedded in an iframe to display objects archived by Software Heritage (currently contents and directories) in a minimalist Web UI. """ focus_swhid = request.GET.get("focus_swhid", swhid) parsed_swhid = None view_data = {} breadcrumbs: List[Dict[str, Any]] = [] swh_objects = [] snapshot_context = None swhids_info_extra_context = {} archive_link = None try: parsed_swhid = get_swhid(swhid) parsed_focus_swhid = get_swhid(focus_swhid) path = parsed_swhid.path.decode("utf-8") if parsed_swhid.path else "" snapshot_context = None revision_id = None if ( parsed_swhid.anchor and parsed_swhid.anchor.object_type == ObjectType.REVISION ): revision_id = parsed_swhid.anchor.object_id.hex() if parsed_swhid.origin or parsed_swhid.visit: snapshot_context = get_snapshot_context( origin_url=parsed_swhid.origin, snapshot_id=parsed_swhid.visit.object_id.hex() if parsed_swhid.visit else None, revision_id=revision_id, ) error_info: Dict[str, Any] = {"status_code": 200, "description": ""} if parsed_swhid and parsed_swhid.object_type == ObjectType.CONTENT: view_data = _get_content_rendering_data(parsed_swhid, path) swh_objects.append( SWHObjectInfo( object_type=ObjectType.CONTENT, object_id=parsed_swhid.object_id.hex(), ) ) elif parsed_swhid and parsed_swhid.object_type == ObjectType.DIRECTORY: view_data = _get_directory_rendering_data( parsed_swhid, parsed_focus_swhid, path ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.DIRECTORY, object_id=parsed_swhid.object_id.hex(), ) ) elif parsed_swhid: error_info = { "status_code": 400, "description": ( f"Objects of type {parsed_swhid.object_type} are not supported" ), } swhids_info_extra_context["path"] = path if parsed_swhid and view_data: breadcrumbs, root_dir = _get_breacrumbs_data( parsed_swhid, parsed_focus_swhid, path, snapshot_context ) if parsed_swhid.object_type == ObjectType.CONTENT and len(breadcrumbs) > 1: swh_objects.append( SWHObjectInfo( object_type=ObjectType.DIRECTORY, object_id=breadcrumbs[-2]["object_id"], ) ) swhids_info_extra_context["path"] = breadcrumbs[-2]["path"] swhids_info_extra_context["filename"] = breadcrumbs[-1]["name"] if snapshot_context: swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=snapshot_context["revision_id"] or "", ) ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.SNAPSHOT, object_id=snapshot_context["snapshot_id"] or "", ) ) archive_link = reverse("browse-swhid", url_args={"swhid": swhid}) if ( parsed_swhid.origin is None and parsed_swhid.visit is None and parsed_swhid.anchor is None and root_dir is not None ): # qualifier values cannot be used to get root directory from them, # we need to add it as anchor in the SWHID argument of the archive link root_dir_swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(root_dir) ) archive_swhid = QualifiedSWHID( object_type=parsed_swhid.object_type, object_id=parsed_swhid.object_id, path=parsed_swhid.path, anchor=root_dir_swhid, ) archive_link = reverse( "browse-swhid", url_args={"swhid": f"{archive_swhid}"}, ) except BadInputExc as e: error_info = {"status_code": 400, "description": f"BadInputExc: {str(e)}"} except NotFoundExc as e: error_info = {"status_code": 404, "description": f"NotFoundExc: {str(e)}"} except Exception as e: error_info = {"status_code": 500, "description": str(e)} return render( request, "misc/iframe.html", { **view_data, "iframe_mode": True, "object_type": parsed_swhid.object_type.value if parsed_swhid else None, "lines": parsed_swhid.lines if parsed_swhid else None, "breadcrumbs": breadcrumbs, "swhid": swhid, "focus_swhid": focus_swhid, "archive_link": archive_link, "error_code": error_info["status_code"], "error_message": http_status_code_message.get(error_info["status_code"]), "error_description": error_info["description"], "snapshot_context": None, "swhids_info": get_swhids_info( swh_objects, snapshot_context, swhids_info_extra_context ), }, status=error_info["status_code"], ) urlpatterns = [ url( r"^embed/(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$", swhid_iframe, name="swhid-iframe", ), ] diff --git a/swh/web/misc/origin_save.py b/swh/web/misc/origin_save.py index 6ae16959..dffa42a6 100644 --- a/swh/web/misc/origin_save.py +++ b/swh/web/misc/origin_save.py @@ -1,113 +1,113 @@ -# Copyright (C) 2018-2021 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url from django.core.paginator import Paginator from django.db.models import Q from django.http import JsonResponse from django.shortcuts import render +from django.urls import re_path as url from swh.web.auth.utils import SWH_AMBASSADOR_PERMISSION, privileged_user from swh.web.common.models import SaveOriginRequest from swh.web.common.origin_save import ( get_savable_visit_types, get_save_origin_task_info, ) def _origin_save_help_view(request): return render( request, "misc/origin-save-help.html", { "heading": ("Request the saving of a software origin into the archive"), "visit_types": get_savable_visit_types( privileged_user(request, permissions=[SWH_AMBASSADOR_PERMISSION]) ), }, ) def _origin_save_list_view(request): return render( request, "misc/origin-save-list.html", { "heading": ("Request the saving of a software origin into the archive"), "visit_types": get_savable_visit_types( privileged_user(request, permissions=[SWH_AMBASSADOR_PERMISSION]) ), }, ) def _origin_save_requests_list(request, status): if status != "all": save_requests = SaveOriginRequest.objects.filter(status=status) else: save_requests = SaveOriginRequest.objects.all() table_data = {} table_data["recordsTotal"] = save_requests.count() table_data["draw"] = int(request.GET["draw"]) search_value = request.GET["search[value]"] column_order = request.GET["order[0][column]"] field_order = request.GET["columns[%s][name]" % column_order] order_dir = request.GET["order[0][dir]"] if order_dir == "desc": field_order = "-" + field_order save_requests = save_requests.order_by(field_order) length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 if search_value: save_requests = save_requests.filter( Q(status__icontains=search_value) | Q(loading_task_status__icontains=search_value) | Q(visit_type__icontains=search_value) | Q(origin_url__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): save_requests = save_requests.filter(user_ids__contains=f'"{request.user.id}"') table_data["recordsFiltered"] = save_requests.count() paginator = Paginator(save_requests, length) table_data["data"] = [sor.to_dict() for sor in paginator.page(page).object_list] return JsonResponse(table_data) def _save_origin_task_info(request, save_request_id): request_info = get_save_origin_task_info( save_request_id, full_info=request.user.is_staff ) for date_field in ("scheduled", "started", "ended"): if date_field in request_info and request_info[date_field] is not None: request_info[date_field] = request_info[date_field].isoformat() return JsonResponse(request_info) urlpatterns = [ url(r"^save/$", _origin_save_help_view, name="origin-save"), url(r"^save/list/$", _origin_save_list_view, name="origin-save-list"), url( r"^save/requests/list/(?P.+)/$", _origin_save_requests_list, name="origin-save-requests-list", ), url( r"^save/task/info/(?P.+)/$", _save_origin_task_info, name="origin-save-task-info", ), ] diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py index 53a07ded..493eb057 100644 --- a/swh/web/misc/urls.py +++ b/swh/web/misc/urls.py @@ -1,120 +1,121 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import requests -from django.conf.urls import include, url +from django.conf.urls import include from django.contrib.staticfiles import finders from django.http import JsonResponse from django.shortcuts import render +from django.urls import re_path as url from django.views.decorators.clickjacking import xframe_options_exempt from swh.web.common import archive from swh.web.common.exc import sentry_capture_exception from swh.web.config import get_config from swh.web.misc.metrics import prometheus_metrics def _jslicenses(request): jslicenses_file = finders.find("jssources/jslicenses.json") jslicenses_data = json.load(open(jslicenses_file)) jslicenses_data = sorted( jslicenses_data.items(), key=lambda item: item[0].split("/")[-1] ) return render(request, "misc/jslicenses.html", {"jslicenses_data": jslicenses_data}) def _stat_counters(request): stat_counters = archive.stat_counters() url = get_config()["history_counters_url"] stat_counters_history = {} try: response = requests.get(url, timeout=5) stat_counters_history = json.loads(response.text) except Exception as exc: sentry_capture_exception(exc) counters = { "stat_counters": stat_counters, "stat_counters_history": stat_counters_history, } return JsonResponse(counters) @xframe_options_exempt def hiring_banner(request): lang = request.GET.get("lang") return render( request, "misc/hiring-banner.html", { "lang": lang if lang else "en", }, ) urlpatterns = [ url(r"^", include("swh.web.misc.coverage")), url(r"^jslicenses/$", _jslicenses, name="jslicenses"), url(r"^", include("swh.web.misc.origin_save")), url(r"^stat_counters/$", _stat_counters, name="stat-counters"), url(r"^", include("swh.web.misc.badges")), url(r"^metrics/prometheus/$", prometheus_metrics, name="metrics-prometheus"), url(r"^", include("swh.web.misc.iframe")), url(r"^", include("swh.web.misc.fundraising")), url(r"^hiring/banner/$", hiring_banner, name="swh-hiring-banner"), ] # when running end to end tests through cypress, declare some extra # endpoints to provide input data for some of those tests if get_config()["e2e_tests_mode"]: from swh.web.tests.views import ( get_content_code_data_all_exts, get_content_code_data_all_filenames, get_content_code_data_by_ext, get_content_code_data_by_filename, get_content_other_data_by_ext, ) urlpatterns.append( url( r"^tests/data/content/code/extension/(?P.+)/$", get_content_code_data_by_ext, name="tests-content-code-extension", ) ) urlpatterns.append( url( r"^tests/data/content/other/extension/(?P.+)/$", get_content_other_data_by_ext, name="tests-content-other-extension", ) ) urlpatterns.append( url( r"^tests/data/content/code/extensions/$", get_content_code_data_all_exts, name="tests-content-code-extensions", ) ) urlpatterns.append( url( r"^tests/data/content/code/filename/(?P.+)/$", get_content_code_data_by_filename, name="tests-content-code-filename", ) ) urlpatterns.append( url( r"^tests/data/content/code/filenames/$", get_content_code_data_all_filenames, name="tests-content-code-filenames", ) ) diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py index b15d2f4c..115a175c 100644 --- a/swh/web/settings/common.py +++ b/swh/web/settings/common.py @@ -1,312 +1,322 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information """ Django common settings for swh-web. """ import os import sys from typing import Any, Dict +from django.utils import encoding + from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.config import get_config +# Fix django-js-reverse 0.9.1 compatibility with django 4.x +# TODO: Remove that hack once a new django-js-reverse release +# is available on PyPI +if not hasattr(encoding, "force_text"): + setattr(encoding, "force_text", encoding.force_str) + swh_web_config = get_config() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = swh_web_config["secret_key"] # SECURITY WARNING: don't run with debug turned on in production! DEBUG = swh_web_config["debug"] DEBUG_PROPAGATE_EXCEPTIONS = swh_web_config["debug"] ALLOWED_HOSTS = ["127.0.0.1", "localhost"] + swh_web_config["allowed_hosts"] # Application definition INSTALLED_APPS = [ "django.contrib.admin", "django.contrib.auth", "django.contrib.contenttypes", "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", "rest_framework", "swh.web.common", "swh.web.inbound_email", "swh.web.api", "swh.web.auth", "swh.web.browse", "swh.web.add_forge_now", "webpack_loader", "django_js_reverse", "corsheaders", ] MIDDLEWARE = [ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "corsheaders.middleware.CorsMiddleware", "django.middleware.common.CommonMiddleware", "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "swh.auth.django.middlewares.OIDCSessionExpiredMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", "swh.web.common.middlewares.ThrottlingHeadersMiddleware", "swh.web.common.middlewares.ExceptionMiddleware", ] # Compress all assets (static ones and dynamically generated html) # served by django in a local development environment context. # In a production environment, assets compression will be directly # handled by web servers like apache or nginx. if swh_web_config["serve_assets"]: MIDDLEWARE.insert(0, "django.middleware.gzip.GZipMiddleware") ROOT_URLCONF = "swh.web.urls" TEMPLATES = [ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [os.path.join(PROJECT_DIR, "../templates")], "APP_DIRS": True, "OPTIONS": { "context_processors": [ "django.template.context_processors.debug", "django.template.context_processors.request", "django.contrib.auth.context_processors.auth", "django.contrib.messages.context_processors.messages", "swh.web.common.utils.context_processor", ], "libraries": { "swh_templatetags": "swh.web.common.swh_templatetags", }, }, }, ] DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": swh_web_config.get("development_db", ""), } } # Password validation # https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", # noqa }, { "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", }, { "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", }, { "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", }, ] # Internationalization # https://docs.djangoproject.com/en/1.11/topics/i18n/ LANGUAGE_CODE = "en-us" TIME_ZONE = "UTC" USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/1.11/howto/static-files/ STATIC_URL = "/static/" # static folder location when swh-web has been installed with pip STATIC_DIR = os.path.join(sys.prefix, "share/swh/web/static") if not os.path.exists(STATIC_DIR): # static folder location when developping swh-web STATIC_DIR = os.path.join(PROJECT_DIR, "../../../static") STATICFILES_DIRS = [STATIC_DIR] INTERNAL_IPS = ["127.0.0.1"] throttle_rates = {} http_requests = ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"] throttling = swh_web_config["throttling"] for limiter_scope, limiter_conf in throttling["scopes"].items(): if "default" in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"]["default"] # for backward compatibility else: throttle_rates[limiter_scope] = limiter_conf["limiter_rate"] # register sub scopes specific for HTTP request types for http_request in http_requests: if http_request in limiter_conf["limiter_rate"]: throttle_rates[limiter_scope + "_" + http_request.lower()] = limiter_conf[ "limiter_rate" ][http_request] REST_FRAMEWORK: Dict[str, Any] = { "DEFAULT_RENDERER_CLASSES": ( "rest_framework.renderers.JSONRenderer", "swh.web.api.renderers.YAMLRenderer", "rest_framework.renderers.TemplateHTMLRenderer", ), "DEFAULT_THROTTLE_CLASSES": ( "swh.web.api.throttling.SwhWebRateThrottle", "swh.web.api.throttling.SwhWebUserRateThrottle", ), "DEFAULT_THROTTLE_RATES": throttle_rates, "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", "swh.auth.django.backends.OIDCBearerTokenAuthentication", ], "EXCEPTION_HANDLER": "swh.web.api.apiresponse.error_response_handler", } LOGGING = { "version": 1, "disable_existing_loggers": False, "filters": { "require_debug_false": { "()": "django.utils.log.RequireDebugFalse", }, "require_debug_true": { "()": "django.utils.log.RequireDebugTrue", }, }, "formatters": { "request": { "format": "[%(asctime)s] [%(levelname)s] %(request)s %(status_code)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "simple": { "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%d/%b/%Y %H:%M:%S", }, "verbose": { "format": ( "[%(asctime)s] [%(levelname)s] %(name)s.%(funcName)s:%(lineno)s " "- %(message)s" ), "datefmt": "%d/%b/%Y %H:%M:%S", }, }, "handlers": { "console": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "simple", }, "file": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "simple", }, "file_request": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "request", }, "console_verbose": { "level": "DEBUG", "filters": ["require_debug_true"], "class": "logging.StreamHandler", "formatter": "verbose", }, "file_verbose": { "level": "WARNING", "filters": ["require_debug_false"], "class": "logging.FileHandler", "filename": os.path.join(swh_web_config["log_dir"], "swh-web.log"), "formatter": "verbose", }, "null": { "class": "logging.NullHandler", }, }, "loggers": { "": { "handlers": ["console_verbose", "file_verbose"], "level": "DEBUG" if DEBUG else "WARNING", }, "django": { "handlers": ["console"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.request": { "handlers": ["file_request"], "level": "DEBUG" if DEBUG else "WARNING", "propagate": False, }, "django.db.backends": {"handlers": ["null"], "propagate": False}, "django.utils.autoreload": { "level": "INFO", }, "swh.core.statsd": { "level": "INFO", }, }, } WEBPACK_LOADER = { "DEFAULT": { "CACHE": False, "BUNDLE_DIR_NAME": "./", "STATS_FILE": os.path.join(STATIC_DIR, "webpack-stats.json"), "POLL_INTERVAL": 0.1, "TIMEOUT": None, "IGNORE": [".+\\.hot-update.js", ".+\\.map"], } } LOGIN_URL = "/admin/login/" LOGIN_REDIRECT_URL = "admin" SESSION_ENGINE = "django.contrib.sessions.backends.cache" CACHES = { "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, } JS_REVERSE_JS_MINIFY = False CORS_ORIGIN_ALLOW_ALL = True CORS_URLS_REGEX = r"^/(badge|api)/.*$" AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", "swh.auth.django.backends.OIDCAuthorizationCodePKCEBackend", ] SWH_AUTH_SERVER_URL = swh_web_config["keycloak"]["server_url"] SWH_AUTH_REALM_NAME = swh_web_config["keycloak"]["realm_name"] SWH_AUTH_CLIENT_ID = OIDC_SWH_WEB_CLIENT_ID SWH_AUTH_SESSION_EXPIRED_REDIRECT_VIEW = "logout" + +DEFAULT_AUTO_FIELD = "django.db.models.AutoField" diff --git a/swh/web/tests/api/test_throttling.py b/swh/web/tests/api/test_throttling.py index 40ec2c58..c0fef1f0 100644 --- a/swh/web/tests/api/test_throttling.py +++ b/swh/web/tests/api/test_throttling.py @@ -1,230 +1,230 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from django.conf.urls import url from django.test.utils import override_settings +from django.urls import re_path as url from rest_framework.decorators import api_view from rest_framework.response import Response from rest_framework.views import APIView from swh.web.api.throttling import ( API_THROTTLING_EXEMPTED_PERM, SwhWebRateThrottle, SwhWebUserRateThrottle, throttle_scope, ) from swh.web.settings.tests import ( scope1_limiter_rate, scope1_limiter_rate_post, scope2_limiter_rate, scope2_limiter_rate_post, scope3_limiter_rate, scope3_limiter_rate_post, ) from swh.web.tests.utils import create_django_permission from swh.web.urls import urlpatterns class MockViewScope1(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope1" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope2") def mock_view_scope2(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") class MockViewScope3(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope3" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope3") def mock_view_scope3(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") urlpatterns += [ url(r"^scope1_class/$", MockViewScope1.as_view()), url(r"^scope2_func/$", mock_view_scope2), url(r"^scope3_class/$", MockViewScope3.as_view()), url(r"^scope3_func/$", mock_view_scope3), ] def check_response(response, status_code, limit=None, remaining=None): assert response.status_code == status_code if limit is not None: assert response["X-RateLimit-Limit"] == str(limit) else: assert "X-RateLimit-Limit" not in response if remaining is not None: assert response["X-RateLimit-Remaining"] == str(remaining) else: assert "X-RateLimit-Remaining" not in response @override_settings(ROOT_URLCONF=__name__) def test_scope1_requests_are_throttled(api_client): """ Ensure request rate is limited in scope1 """ for i in range(scope1_limiter_rate): response = api_client.get("/scope1_class/") check_response(response, 200, scope1_limiter_rate, scope1_limiter_rate - i - 1) response = api_client.get("/scope1_class/") check_response(response, 429, scope1_limiter_rate, 0) for i in range(scope1_limiter_rate_post): response = api_client.post("/scope1_class/") check_response( response, 200, scope1_limiter_rate_post, scope1_limiter_rate_post - i - 1 ) response = api_client.post("/scope1_class/") check_response(response, 429, scope1_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope2_requests_are_throttled(api_client): """ Ensure request rate is limited in scope2 """ for i in range(scope2_limiter_rate): response = api_client.get("/scope2_func/") check_response(response, 200, scope2_limiter_rate, scope2_limiter_rate - i - 1) response = api_client.get("/scope2_func/") check_response(response, 429, scope2_limiter_rate, 0) for i in range(scope2_limiter_rate_post): response = api_client.post("/scope2_func/") check_response( response, 200, scope2_limiter_rate_post, scope2_limiter_rate_post - i - 1 ) response = api_client.post("/scope2_func/") check_response(response, 429, scope2_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope3_requests_are_throttled_exempted(api_client): """ Ensure request rate is not limited in scope3 as requests coming from localhost are exempted from rate limit. """ for _ in range(scope3_limiter_rate + 1): response = api_client.get("/scope3_class/") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): response = api_client.post("/scope3_class/") check_response(response, 200) for _ in range(scope3_limiter_rate + 1): response = api_client.get("/scope3_func/") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): response = api_client.post("/scope3_func/") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_staff_users_are_not_rate_limited(api_client, staff_user): api_client.force_login(staff_user) for _ in range(scope2_limiter_rate + 1): response = api_client.get("/scope2_func/") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): response = api_client.post("/scope2_func/") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_non_staff_users_are_rate_limited(api_client, regular_user): api_client.force_login(regular_user) scope2_limiter_rate_user = ( scope2_limiter_rate * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_user): response = api_client.get("/scope2_func/") check_response( response, 200, scope2_limiter_rate_user, scope2_limiter_rate_user - i - 1 ) response = api_client.get("/scope2_func/") check_response(response, 429, scope2_limiter_rate_user, 0) scope2_limiter_rate_post_user = ( scope2_limiter_rate_post * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_post_user): response = api_client.post("/scope2_func/") check_response( response, 200, scope2_limiter_rate_post_user, scope2_limiter_rate_post_user - i - 1, ) response = api_client.post("/scope2_func/") check_response(response, 429, scope2_limiter_rate_post_user, 0) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_users_with_throttling_exempted_perm_are_not_rate_limited( api_client, regular_user ): regular_user.user_permissions.add( create_django_permission(API_THROTTLING_EXEMPTED_PERM) ) assert regular_user.has_perm(API_THROTTLING_EXEMPTED_PERM) api_client.force_login(regular_user) for _ in range(scope2_limiter_rate + 1): response = api_client.get("/scope2_func/") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): response = api_client.post("/scope2_func/") check_response(response, 200) diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py index 38551e6a..e6a980d1 100644 --- a/swh/web/tests/common/test_utils.py +++ b/swh/web/tests/common/test_utils.py @@ -1,392 +1,393 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information + from base64 import b64encode import datetime import math import sys from urllib.parse import quote import pytest -from django.conf.urls import url from django.test.utils import override_settings +from django.urls import re_path as url from django.urls.exceptions import NoReverseMatch from swh.web.common import utils from swh.web.common.exc import BadInputExc from swh.web.config import SWH_WEB_SERVER_NAME, SWH_WEB_STAGING_SERVER_NAMES, get_config def test_shorten_path_noop(): noops = ["/api/", "/browse/", "/content/symbol/foobar/"] for noop in noops: assert utils.shorten_path(noop) == noop def test_shorten_path_sha1(): sha1 = "aafb16d69fd30ff58afdd69036a26047f3aebdc6" short_sha1 = sha1[:8] + "..." templates = [ "/api/1/content/sha1:%s/", "/api/1/content/sha1_git:%s/", "/api/1/directory/%s/", "/api/1/content/sha1:%s/ctags/", ] for template in templates: assert utils.shorten_path(template % sha1) == template % short_sha1 def test_shorten_path_sha256(): sha256 = "aafb16d69fd30ff58afdd69036a26047" "213add102934013a014dfca031c41aef" short_sha256 = sha256[:8] + "..." templates = [ "/api/1/content/sha256:%s/", "/api/1/directory/%s/", "/api/1/content/sha256:%s/filetype/", ] for template in templates: assert utils.shorten_path(template % sha256) == template % short_sha256 @pytest.mark.parametrize( "input_timestamp, output_date", [ ( "2016-01-12", datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc), ), ( "2016-01-12T09:19:12+0100", datetime.datetime(2016, 1, 12, 8, 19, 12, tzinfo=datetime.timezone.utc), ), ( "2007-01-14T20:34:22Z", datetime.datetime(2007, 1, 14, 20, 34, 22, tzinfo=datetime.timezone.utc), ), ], ) def test_parse_iso8601_date_to_utc_ok(input_timestamp, output_date): assert utils.parse_iso8601_date_to_utc(input_timestamp) == output_date @pytest.mark.parametrize( "invalid_iso8601_timestamp", ["Today is January 1, 2047 at 8:21:00AM", "1452591542"] ) def test_parse_iso8601_date_to_utc_ko(invalid_iso8601_timestamp): with pytest.raises(BadInputExc): utils.parse_iso8601_date_to_utc(invalid_iso8601_timestamp) def test_format_utc_iso_date(): assert ( utils.format_utc_iso_date("2017-05-04T13:27:13+02:00") == "04 May 2017, 11:27:13 UTC" ) def test_gen_path_info(): input_path = "/home/user/swh-environment/swh-web/" expected_result = [ {"name": "home", "path": "home"}, {"name": "user", "path": "home/user"}, {"name": "swh-environment", "path": "home/user/swh-environment"}, {"name": "swh-web", "path": "home/user/swh-environment/swh-web"}, ] path_info = utils.gen_path_info(input_path) assert path_info == expected_result input_path = "home/user/swh-environment/swh-web" path_info = utils.gen_path_info(input_path) assert path_info == expected_result def test_rst_to_html(): rst = ( "Section\n" "=======\n\n" "**Some strong text**\n\n" "* This is a bulleted list.\n" "* It has two items, the second\n" " item uses two lines.\n" "\n" "1. This is a numbered list.\n" "2. It has two items too.\n" "\n" "#. This is a numbered list.\n" "#. It has two items too.\n" ) expected_html = ( '

Section

\n' "

Some strong text

\n" '
    \n' "
  • This is a bulleted list.

  • \n" "
  • It has two items, the second\n" "item uses two lines.

  • \n" "
\n" '
    \n' "
  1. This is a numbered list.

  2. \n" "
  3. It has two items too.

  4. \n" "
  5. This is a numbered list.

  6. \n" "
  7. It has two items too.

  8. \n" "
\n" "
" ) assert utils.rst_to_html(rst) == expected_html def sample_test_view(request, string, number): pass def sample_test_view_no_url_args(request): pass urlpatterns = [ url( r"^sample/test/(?P.+)/view/(?P[0-9]+)/$", sample_test_view, name="sample-test-view", ), url( r"^sample/test/view/no/url/args/$", sample_test_view_no_url_args, name="sample-test-view-no-url-args", ), ] @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ok(): string = "foo" number = 55 url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number} ) assert url == f"/sample/test/{string}/view/{number}/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_only_ko(): string = "foo" with pytest.raises(NoReverseMatch): utils.reverse("sample-test-view", url_args={"string": string, "number": string}) @override_settings(ROOT_URLCONF=__name__) def test_reverse_no_url_args(): url = utils.reverse("sample-test-view-no-url-args") assert url == "/sample/test/view/no/url/args/" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_only(): start = 0 scope = "foo" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": scope} ) assert url == f"/sample/test/view/no/url/args/?scope={scope}&start={start}" url = utils.reverse( "sample-test-view-no-url-args", query_params={"start": start, "scope": None} ) assert url == f"/sample/test/view/no/url/args/?start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_query_params_encode(): libname = "libstc++" url = utils.reverse( "sample-test-view-no-url-args", query_params={"libname": libname} ) assert url == f"/sample/test/view/no/url/args/?libname={quote(libname, safe='/;:')}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_url_args_query_params(): string = "foo" number = 55 start = 10 scope = "bar" url = utils.reverse( "sample-test-view", url_args={"string": string, "number": number}, query_params={"start": start, "scope": scope}, ) assert url == f"/sample/test/{string}/view/{number}/?scope={scope}&start={start}" @override_settings(ROOT_URLCONF=__name__) def test_reverse_absolute_uri(request_factory): request = request_factory.get(utils.reverse("sample-test-view-no-url-args")) url = utils.reverse("sample-test-view-no-url-args", request=request) assert url == f"http://{request.META['SERVER_NAME']}/sample/test/view/no/url/args/" def test_get_deposits_list(requests_mock): deposits_data = { "count": 2, "results": [ { "check_task_id": "351820217", "client": 2, "collection": 1, "complete_date": "2021-01-21T07:52:19.919312Z", "external_id": "hal-03116143", "id": 1412, "load_task_id": "351820260", "origin_url": "https://hal.archives-ouvertes.fr/hal-03116143", "parent": None, "reception_date": "2021-01-21T07:52:19.471019Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:f25157ad1b13cb20ac3457d4f6756b49ac63d079", }, { "check_task_id": "381576507", "client": 2, "collection": 1, "complete_date": "2021-07-07T08:00:44.726676Z", "external_id": "hal-03275052", "id": 1693, "load_task_id": "381576508", "origin_url": "https://hal.archives-ouvertes.fr/hal-03275052", "parent": None, "reception_date": "2021-07-07T08:00:44.327661Z", "status": "done", "status_detail": None, "swhid": "swh:1:dir:825fa96d1810177ec08a772ffa5bd34bbd08b89c", }, ], } config = get_config()["deposit"] private_api_url = config["private_api_url"].rstrip("/") + "/" deposits_list_url = private_api_url + "deposits" basic_auth_payload = ( config["private_api_user"] + ":" + config["private_api_password"] ).encode() requests_mock.get( deposits_list_url, json=deposits_data, request_headers={ "Authorization": f"Basic {b64encode(basic_auth_payload).decode('ascii')}" }, ) assert utils.get_deposits_list() == deposits_data["results"] @pytest.mark.parametrize("backend", ["swh-search", "swh-storage"]) def test_origin_visit_types(mocker, backend): if backend != "swh-search": # equivalent to not configuring search in the config search = mocker.patch("swh.web.common.utils.search") search.return_value = None assert utils.origin_visit_types() == [] else: # see swh/web/tests/data.py for origins added for tests assert utils.origin_visit_types() == ["git", "tar"] @pytest.mark.parametrize("server_name", ["localhost", "127.0.0.1", "testserver"]) def test_is_swh_web_development(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_development(request) @pytest.mark.parametrize("server_name", SWH_WEB_STAGING_SERVER_NAMES) def test_is_swh_web_staging(request_factory, server_name): request = request_factory.get("/", SERVER_NAME=server_name) assert utils.is_swh_web_staging(request) def test_is_swh_web_production(request_factory): request = request_factory.get("/", SERVER_NAME=SWH_WEB_SERVER_NAME) assert utils.is_swh_web_production(request) def add(x, y): return x + y def test_django_cache(mocker): """Decorated function should be called once and returned value put in django cache.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 1 assert spy_cache_set.call_count == 1 def test_django_cache_invalidate_cache_pred(mocker): """Decorated function should be called twice and returned value put in django cache twice.""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache(invalidate_cache_pred=lambda val: val == 3)(add) val = cached_add(1, 2) val2 = cached_add(1, 2) assert val == val2 == 3 assert spy_add.call_count == 2 assert spy_cache_set.call_count == 2 def test_django_cache_raise_exception(mocker): """Decorated function should be called twice, exceptions should be raised and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache()(add) with pytest.raises(TypeError): cached_add(1, "2") with pytest.raises(TypeError): cached_add(1, "2") assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0 def test_django_cache_catch_exception(mocker): """Decorated function should be called twice, exceptions should not be raised, specified fallback value should be returned and no value put in django cache""" spy_add = mocker.spy(sys.modules[__name__], "add") spy_cache_set = mocker.spy(utils.cache, "set") cached_add = utils.django_cache( catch_exception=True, exception_return_value=math.nan )(add) val = cached_add(1, "2") val2 = cached_add(1, "2") assert math.isnan(val) assert math.isnan(val2) assert spy_add.call_count == 2 assert spy_cache_set.call_count == 0 diff --git a/swh/web/urls.py b/swh/web/urls.py index dde8db87..97ec4eb3 100644 --- a/swh/web/urls.py +++ b/swh/web/urls.py @@ -1,86 +1,80 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django_js_reverse.views import urls_js from django.conf import settings -from django.conf.urls import ( - handler400, - handler403, - handler404, - handler500, - include, - url, -) +from django.conf.urls import handler400, handler403, handler404, handler500, include from django.contrib.auth.views import LogoutView from django.contrib.staticfiles.views import serve from django.shortcuts import render +from django.urls import re_path as url from django.views.generic.base import RedirectView from swh.web.browse.identifiers import swhid_browse from swh.web.common.exc import ( swh_handle400, swh_handle403, swh_handle404, swh_handle500, ) from swh.web.common.utils import origin_visit_types from swh.web.config import get_config, is_feature_enabled swh_web_config = get_config() favicon_view = RedirectView.as_view( url="/static/img/icons/swh-logo-32x32.png", permanent=True ) def _default_view(request): return render(request, "homepage.html", {"visit_types": origin_visit_types()}) urlpatterns = [ url(r"^admin/", include("swh.web.admin.urls")), url(r"^favicon\.ico/$", favicon_view), url(r"^api/", include("swh.web.api.urls")), url(r"^browse/", include("swh.web.browse.urls")), url(r"^$", _default_view, name="swh-web-homepage"), url(r"^jsreverse/$", urls_js, name="js_reverse"), # keep legacy SWHID resolving URL with trailing slash for backward compatibility url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)/$", swhid_browse, name="browse-swhid-legacy", ), url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)$", swhid_browse, name="browse-swhid", ), url(r"^", include("swh.web.misc.urls")), url(r"^", include("swh.web.auth.views")), url(r"^logout/$", LogoutView.as_view(template_name="logout.html"), name="logout"), ] if is_feature_enabled("add_forge_now"): urlpatterns += (url(r"^", include("swh.web.add_forge_now.views")),) # allow to serve assets through django staticfiles # even if settings.DEBUG is False def insecure_serve(request, path, **kwargs): return serve(request, path, insecure=True, **kwargs) # enable to serve compressed assets through django development server if swh_web_config["serve_assets"]: static_pattern = r"^%s(?P.*)/$" % settings.STATIC_URL[1:] urlpatterns.append(url(static_pattern, insecure_serve)) handler400 = swh_handle400 # noqa handler403 = swh_handle403 # noqa handler404 = swh_handle404 # noqa handler500 = swh_handle500 # noqa