diff --git a/swh/web/add_forge_now/views.py b/swh/web/add_forge_now/views.py index 1d617f4a..35a1a492 100644 --- a/swh/web/add_forge_now/views.py +++ b/swh/web/add_forge_now/views.py @@ -1,107 +1,107 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List from django.conf.urls import url from django.core.paginator import Paginator from django.db.models import Q from django.http.request import HttpRequest from django.http.response import HttpResponse, JsonResponse from django.shortcuts import render from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.api.views.add_forge_now import ( AddForgeNowRequestPublicSerializer, AddForgeNowRequestSerializer, ) from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION def add_forge_request_list_datatables(request: HttpRequest) -> HttpResponse: """Dedicated endpoint used by datatables to display the add-forge requests in the Web UI. """ draw = int(request.GET.get("draw", 0)) add_forge_requests = AddForgeRequest.objects.all() table_data: Dict[str, Any] = { "recordsTotal": add_forge_requests.count(), "draw": draw, } search_value = request.GET.get("search[value]") column_order = request.GET.get("order[0][column]") field_order = request.GET.get(f"columns[{column_order}][name]", "id") order_dir = request.GET.get("order[0][dir]", "desc") if field_order: if order_dir == "desc": field_order = "-" + field_order add_forge_requests = add_forge_requests.order_by(field_order) per_page = int(request.GET.get("length", 10)) page_num = int(request.GET.get("start", 0)) // per_page + 1 if search_value: add_forge_requests = add_forge_requests.filter( Q(forge_type__icontains=search_value) | Q(forge_url__icontains=search_value) | Q(status__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): add_forge_requests = add_forge_requests.filter( submitter_name=request.user.username ) paginator = Paginator(add_forge_requests, per_page) page = paginator.page(page_num) if request.user.has_perm(ADD_FORGE_MODERATOR_PERMISSION): requests = AddForgeNowRequestSerializer(page.object_list, many=True).data else: requests = AddForgeNowRequestPublicSerializer(page.object_list, many=True).data results = [dict(request) for request in requests] table_data["recordsFiltered"] = add_forge_requests.count() table_data["data"] = results return JsonResponse(table_data) FORGE_TYPES: List[str] = [ "bitbucket", "cgit", "gitlab", "gitea", "heptapod", ] def create_request(request): """View to create a new 'add_forge_now' request. """ return render( request, "add_forge_now/create-request.html", {"forge_types": FORGE_TYPES}, ) urlpatterns = [ url( - r"^add-forge/request/list/datatables$", + r"^add-forge/request/list/datatables/$", add_forge_request_list_datatables, name="add-forge-request-list-datatables", ), - url(r"^add-forge/request/create$", create_request, name="forge-add"), + url(r"^add-forge/request/create/$", create_request, name="forge-add"), ] diff --git a/swh/web/api/views/add_forge_now.py b/swh/web/api/views/add_forge_now.py index 6c232c94..e2eebc36 100644 --- a/swh/web/api/views/add_forge_now.py +++ b/swh/web/api/views/add_forge_now.py @@ -1,355 +1,355 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union from django.core.exceptions import ObjectDoesNotExist from django.core.paginator import Paginator from django.db import transaction from django.forms import CharField, ModelForm from django.http import HttpResponseBadRequest from django.http.request import HttpRequest from django.http.response import HttpResponse, HttpResponseForbidden from rest_framework import serializers from rest_framework.request import Request from rest_framework.response import Response from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestActorRole as AddForgeNowRequestActorRole from swh.web.add_forge_now.models import RequestHistory as AddForgeNowRequestHistory from swh.web.add_forge_now.models import RequestStatus as AddForgeNowRequestStatus from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse def _block_while_testing(): """Replaced by tests to check concurrency behavior """ pass class AddForgeNowRequestForm(ModelForm): class Meta: model = AddForgeRequest fields = ( "forge_type", "forge_url", "forge_contact_email", "forge_contact_name", "forge_contact_comment", ) class AddForgeNowRequestHistoryForm(ModelForm): new_status = CharField(max_length=200, required=False,) class Meta: model = AddForgeNowRequestHistory fields = ("text", "new_status") class AddForgeNowRequestSerializer(serializers.ModelSerializer): class Meta: model = AddForgeRequest fields = "__all__" class AddForgeNowRequestPublicSerializer(serializers.ModelSerializer): """Serializes AddForgeRequest without private fields. """ class Meta: model = AddForgeRequest fields = ("id", "forge_url", "forge_type", "status", "submission_date") class AddForgeNowRequestHistorySerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory exclude = ("request",) class AddForgeNowRequestHistoryPublicSerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory fields = ("id", "date", "new_status", "actor_role") @api_route( - r"/add-forge/request/create", "api-1-add-forge-request-create", methods=["POST"], + r"/add-forge/request/create/", "api-1-add-forge-request-create", methods=["POST"], ) @api_doc("/add-forge/request/create") @format_docstring() @transaction.atomic def api_add_forge_request_create(request: Union[HttpRequest, Request]) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/create/ Create a new request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/update/", "api-1-add-forge-request-update", methods=["POST"], ) @api_doc("/add-forge/request/update", tags=["hidden"]) @format_docstring() @transaction.atomic def api_add_forge_request_update( request: Union[HttpRequest, Request], id: int ) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/update/ Update a request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/get", + r"/add-forge/request/(?P[0-9]+)/get/", "api-1-add-forge-request-get", methods=["GET"], ) @api_doc("/add-forge/request/get") @format_docstring() def api_add_forge_request_get(request: Request, id: int): """ .. http:get:: /api/1/add-forge/request/get/ Return all details about an add-forge request. {common_headers} :param int id: add-forge request identifier :statuscode 200: request details successfully returned :statuscode 400: request identifier does not exist """ try: add_forge_request = AddForgeRequest.objects.get(id=id) except ObjectDoesNotExist: raise BadInputExc("Request id does not exist") request_history = AddForgeNowRequestHistory.objects.filter( request=add_forge_request ).order_by("id") if request.user.is_authenticated and request.user.has_perm( ADD_FORGE_MODERATOR_PERMISSION ): data = AddForgeNowRequestSerializer(add_forge_request).data history = AddForgeNowRequestHistorySerializer(request_history, many=True).data else: data = AddForgeNowRequestPublicSerializer(add_forge_request).data history = AddForgeNowRequestHistoryPublicSerializer( request_history, many=True ).data return {"request": data, "history": history} diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index 9da0c357..0f3774ff 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,480 +1,480 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.search.exc import SearchQuerySyntaxError from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.utils import ( enrich_origin, enrich_origin_search_result, enrich_origin_visit, ) from swh.web.api.views.utils import api_lookup from swh.web.common import archive from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse DOC_RETURN_ORIGIN = """ :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url """ DOC_RETURN_ORIGIN_ARRAY = DOC_RETURN_ORIGIN.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT = """ :>json string date: ISO8601/RFC3339 representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit (may be null if status is not **full**). :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit (may be null if status is not **full**). :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit """ DOC_RETURN_ORIGIN_VISIT_ARRAY = DOC_RETURN_ORIGIN_VISIT.replace(":>json", ":>jsonarr") DOC_RETURN_ORIGIN_VISIT_ARRAY += """ :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit """ @api_route(r"/origins/", "api-1-origins") @api_doc("/origins/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. .. warning:: This endpoint used to provide an ``origin_from`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_count=500` """ old_param_origin_from = request.query_params.get("origin_from") if old_param_origin_from: raise BadInputExc("Please use the Link header to browse through result") page_token = request.query_params.get("page_token", None) limit = min(int(request.query_params.get("origin_count", "100")), 10000) page_result = archive.lookup_origins(page_token, limit) origins = [enrich_origin(o, request=request) for o in page_result.results] next_page_token = page_result.next_page_token response = {"results": origins, "headers": {}} if next_page_token is not None: response["headers"]["link-next"] = reverse( "api-1-origins", query_params={"page_token": next_page_token, "origin_count": limit}, request=request, ) return response @api_route(r"/origin/(?P.+)/get/", "api-1-origin") @api_doc("/origin/") @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/get/` """ ori_dict = {"url": origin_url} error_msg = "Origin with url %s not found." % ori_dict["url"] return api_lookup( archive.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=enrich_origin, request=request, ) @api_route( r"/origin/search/(?P.+)/", "api-1-origin-search", throttle_scope="swh_api_origin_search", ) @api_doc("/origin/search/") @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. .. warning:: This endpoint used to provide an ``offset`` query parameter, and guarantee an order on results. This is no longer true, and only the Link header should be used for paginating through results. :param string url_pattern: a string pattern :query boolean use_ql: whether to use swh search query language or not :query int limit: the maximum number of found origins to return (bounded to 1000) :query boolean with_visit: if true, only return origins with at least one visit by Software heritage {return_origin_array} {common_headers} {resheader_link} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} limit = min(int(request.query_params.get("limit", "70")), 1000) page_token = request.query_params.get("page_token") use_ql = request.query_params.get("use_ql", "false") with_visit = request.query_params.get("with_visit", "false") visit_type = request.query_params.get("visit_type") try: (results, page_token) = api_lookup( archive.search_origin, url_pattern, bool(strtobool(use_ql)), limit, bool(strtobool(with_visit)), [visit_type] if visit_type else None, page_token, enrich_fn=enrich_origin_search_result, request=request, ) except SearchQuerySyntaxError as e: raise BadInputExc(f"Syntax error in search query: {e.args[0]}") if page_token is not None: query_params = {k: v for (k, v) in request.GET.dict().items()} query_params["page_token"] = page_token result["headers"] = { "link-next": reverse( "api-1-origin-search", url_args={"url_pattern": url_pattern}, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route(r"/origin/metadata-search/", "api-1-origin-metadata-search") @api_doc("/origin/metadata-search/", noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get("fulltext", None) limit = min(int(request.query_params.get("limit", "70")), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup( archive.search_origin_metadata, fulltext, limit, request=request ) return { "results": results, } @api_route(r"/origin/(?P.*)/visits/", "api-1-origin-visits") @api_doc("/origin/visits/") @format_docstring(return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` """ result = {} origin_query = {"url": origin_url} notfound_msg = "No origin {} found".format(origin_url) url_args_next = {"origin_url": origin_url} per_page = int(request.query_params.get("per_page", "10")) last_visit = request.query_params.get("last_visit") if last_visit: last_visit = int(last_visit) def _lookup_origin_visits(origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v["visit"] == last_visit: visits = all_visits[i + 1 : i + 1 + per_page] break for v in visits: yield v results = api_lookup( _lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial( enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True ), request=request, ) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]["visit"] query_params = {} query_params["last_visit"] = new_last_visit if request.query_params.get("per_page"): query_params["per_page"] = per_page result["headers"] = { "link-next": reverse( "api-1-origin-visits", url_args=url_args_next, query_params=query_params, request=request, ) } result.update({"results": results}) return result @api_route( r"/origin/(?P.*)/visit/latest/", "api-1-origin-visit-latest", throttle_scope="swh_api_origin_visit_latest", ) @api_doc("/origin/visit/latest/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit_latest(request, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/latest/ Get information about the latest visit of a software origin. :param str origin_url: a software origin URL :query boolean require_snapshot: if true, only return a visit with a snapshot {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/latest/` """ require_snapshot = request.query_params.get("require_snapshot", "false") return api_lookup( archive.lookup_origin_visit_latest, origin_url, bool(strtobool(require_snapshot)), notfound_msg=("No visit for origin {} found".format(origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( r"/origin/(?P.*)/visit/(?P[0-9]+)/", "api-1-origin-visit" ) @api_doc("/origin/visit/") @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request, visit_id, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` """ return api_lookup( archive.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=("No visit {} for origin {} found".format(visit_id, origin_url)), enrich_fn=partial( enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False ), request=request, ) @api_route( - r"/origin/(?P.+)" "/intrinsic-metadata", "api-origin-intrinsic-metadata" + r"/origin/(?P.+)/intrinsic-metadata/", "api-origin-intrinsic-metadata" ) @api_doc("/origin/intrinsic-metadata/") @format_docstring() def api_origin_intrinsic_metadata(request, origin_url): """ .. http:get:: /api/1/origin/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/python/cpython/intrinsic-metadata` """ return api_lookup( archive.lookup_origin_intrinsic_metadata, origin_url, notfound_msg=f"Origin with url {origin_url} not found", enrich_fn=enrich_origin, request=request, ) diff --git a/swh/web/auth/mailmap.py b/swh/web/auth/mailmap.py index d601a8ff..f4f628c0 100644 --- a/swh/web/auth/mailmap.py +++ b/swh/web/auth/mailmap.py @@ -1,119 +1,119 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.conf.urls import url from django.db import IntegrityError from django.db.models import Q from django.http.response import ( HttpResponse, HttpResponseBadRequest, HttpResponseForbidden, HttpResponseNotFound, ) from rest_framework import serializers from rest_framework.decorators import api_view from rest_framework.request import Request from rest_framework.response import Response from swh.web.auth.models import UserMailmap, UserMailmapEvent from swh.web.auth.utils import MAILMAP_PERMISSION class UserMailmapSerializer(serializers.ModelSerializer): class Meta: model = UserMailmap fields = "__all__" @api_view(["GET"]) def profile_list_mailmap(request: Request) -> HttpResponse: if not request.user.has_perm(MAILMAP_PERMISSION): return HttpResponseForbidden() mms = UserMailmap.objects.filter(user_id=str(request.user.id),).all() return Response(UserMailmapSerializer(mms, many=True).data) @api_view(["POST"]) def profile_add_mailmap(request: Request) -> HttpResponse: if not request.user.has_perm(MAILMAP_PERMISSION): return HttpResponseForbidden() event = UserMailmapEvent.objects.create( user_id=str(request.user.id), request_type="add", request=json.dumps(request.data), ) from_email = request.data.pop("from_email", None) if not from_email: return HttpResponseBadRequest("'from_email' must be provided and non-empty.") try: UserMailmap.objects.create( user_id=str(request.user.id), from_email=from_email, **request.data ) except IntegrityError as e: if "user_mailmap_from_email_key" in e.args[0]: return HttpResponseBadRequest("This 'from_email' already exists.") else: raise event.successful = True event.save() mm = UserMailmap.objects.get(user_id=str(request.user.id), from_email=from_email) return Response(UserMailmapSerializer(mm).data) @api_view(["POST"]) def profile_update_mailmap(request: Request) -> HttpResponse: if not request.user.has_perm(MAILMAP_PERMISSION): return HttpResponseForbidden() event = UserMailmapEvent.objects.create( user_id=str(request.user.id), request_type="update", request=json.dumps(request.data), ) from_email = request.data.pop("from_email", None) if not from_email: return HttpResponseBadRequest("'from_email' must be provided and non-empty.") user_id = str(request.user.id) try: to_update = ( UserMailmap.objects.filter(Q(user_id__isnull=True) | Q(user_id=user_id)) .filter(from_email=from_email) .get() ) except UserMailmap.DoesNotExist: return HttpResponseNotFound() for attr, value in request.data.items(): setattr(to_update, attr, value) to_update.save() event.successful = True event.save() mm = UserMailmap.objects.get(user_id=user_id, from_email=from_email) return Response(UserMailmapSerializer(mm).data) urlpatterns = [ - url(r"^profile/mailmap/list$", profile_list_mailmap, name="profile-mailmap-list",), - url(r"^profile/mailmap/add$", profile_add_mailmap, name="profile-mailmap-add",), + url(r"^profile/mailmap/list/$", profile_list_mailmap, name="profile-mailmap-list",), + url(r"^profile/mailmap/add/$", profile_add_mailmap, name="profile-mailmap-add",), url( - r"^profile/mailmap/update$", + r"^profile/mailmap/update/$", profile_update_mailmap, name="profile-mailmap-update", ), ] diff --git a/swh/web/browse/views/content.py b/swh/web/browse/views/content.py index f4152747..97897595 100644 --- a/swh/web/browse/views/content.py +++ b/swh/web/browse/views/content.py @@ -1,444 +1,444 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import difflib from distutils.util import strtobool import sentry_sdk from django.http import HttpResponse, JsonResponse from django.shortcuts import redirect, render from swh.model.hashutil import hash_to_hex from swh.model.swhids import ObjectType from swh.web.browse.browseurls import browse_route from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( content_display_max_size, gen_link, prepare_content_for_display, request_content, ) from swh.web.common import archive, highlightjs, query from swh.web.common.exc import BadInputExc, NotFoundExc, http_status_code_message from swh.web.common.identifiers import get_swhids_info from swh.web.common.typing import ContentMetadata, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse, swh_object_icons @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/raw/", view_name="browse-content-raw", checksum_args=["query_string"], ) def content_raw(request, query_string): """Django view that produces a raw display of a content identified by its hash value. The url that points to it is :http:get:`/browse/content/[(algo_hash):](hash)/raw/` """ re_encode = bool(strtobool(request.GET.get("re_encode", "false"))) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) content_data = request_content(query_string, max_size=None, re_encode=re_encode) filename = request.GET.get("filename", None) if not filename: filename = "%s_%s" % (algo, checksum) if ( content_data["mimetype"].startswith("text/") or content_data["mimetype"] == "inode/x-empty" ): response = HttpResponse(content_data["raw_data"], content_type="text/plain") response["Content-disposition"] = "filename=%s" % filename else: response = HttpResponse( content_data["raw_data"], content_type="application/octet-stream" ) response["Content-disposition"] = "attachment; filename=%s" % filename return response _auto_diff_size_limit = 20000 @browse_route( - r"content/(?P.*)/diff/(?P.*)", + r"content/(?P.*)/diff/(?P.*)/", view_name="diff-contents", ) def _contents_diff(request, from_query_string, to_query_string): """ Browse endpoint used to compute unified diffs between two contents. Diffs are generated only if the two contents are textual. By default, diffs whose size are greater than 20 kB will not be generated. To force the generation of large diffs, the 'force' boolean query parameter must be used. Args: request: input django http request from_query_string: a string of the form "[ALGO_HASH:]HASH" where optional ALGO_HASH can be either ``sha1``, ``sha1_git``, ``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH the hexadecimal representation of the hash value identifying the first content to_query_string: same as above for identifying the second content Returns: A JSON object containing the unified diff. """ diff_data = {} content_from = None content_to = None content_from_size = 0 content_to_size = 0 content_from_lines = [] content_to_lines = [] force = request.GET.get("force", "false") path = request.GET.get("path", None) language = "plaintext" force = bool(strtobool(force)) if from_query_string == to_query_string: diff_str = "File renamed without changes" else: try: text_diff = True if from_query_string: content_from = request_content(from_query_string, max_size=None) content_from_display_data = prepare_content_for_display( content_from["raw_data"], content_from["mimetype"], path ) language = content_from_display_data["language"] content_from_size = content_from["length"] if not ( content_from["mimetype"].startswith("text/") or content_from["mimetype"] == "inode/x-empty" ): text_diff = False if text_diff and to_query_string: content_to = request_content(to_query_string, max_size=None) content_to_display_data = prepare_content_for_display( content_to["raw_data"], content_to["mimetype"], path ) language = content_to_display_data["language"] content_to_size = content_to["length"] if not ( content_to["mimetype"].startswith("text/") or content_to["mimetype"] == "inode/x-empty" ): text_diff = False diff_size = abs(content_to_size - content_from_size) if not text_diff: diff_str = "Diffs are not generated for non textual content" language = "plaintext" elif not force and diff_size > _auto_diff_size_limit: diff_str = "Large diffs are not automatically computed" language = "plaintext" else: if content_from: content_from_lines = ( content_from["raw_data"].decode("utf-8").splitlines(True) ) if content_from_lines and content_from_lines[-1][-1] != "\n": content_from_lines[-1] += "[swh-no-nl-marker]\n" if content_to: content_to_lines = ( content_to["raw_data"].decode("utf-8").splitlines(True) ) if content_to_lines and content_to_lines[-1][-1] != "\n": content_to_lines[-1] += "[swh-no-nl-marker]\n" diff_lines = difflib.unified_diff(content_from_lines, content_to_lines) diff_str = "".join(list(diff_lines)[2:]) except Exception as exc: sentry_sdk.capture_exception(exc) diff_str = str(exc) diff_data["diff_str"] = diff_str diff_data["language"] = language return JsonResponse(diff_data) def _get_content_from_request(request): path = request.GET.get("path") if path is None: raise BadInputExc("The path query parameter must be provided.") snapshot = request.GET.get("snapshot") or request.GET.get("snapshot_id") origin_url = request.GET.get("origin_url") if snapshot is None and origin_url is None: raise BadInputExc( "The origin_url or snapshot query parameters must be provided." ) snapshot_context = get_snapshot_context( snapshot_id=snapshot, origin_url=origin_url, path=path, timestamp=request.GET.get("timestamp"), visit_id=request.GET.get("visit_id"), branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), browse_context="content", ) root_directory = snapshot_context["root_directory"] return archive.lookup_directory_with_path(root_directory, path) @browse_route( r"content/(?P[0-9a-z_:]*[0-9a-f]+.)/", r"content/", view_name="browse-content", checksum_args=["query_string"], ) def content_display(request, query_string=None): """Django view that produces an HTML display of a content identified by its hash value. The URLs that points to it are :http:get:`/browse/content/[(algo_hash):](hash)/` :http:get:`/browse/content/` """ if query_string is None: # this case happens when redirected from origin/content or snapshot/content content = _get_content_from_request(request) return redirect( reverse( "browse-content", url_args={"query_string": f"sha1_git:{content['target']}"}, query_params=request.GET, ), ) algo, checksum = query.parse_hash(query_string) checksum = hash_to_hex(checksum) origin_url = request.GET.get("origin_url") selected_language = request.GET.get("language") if not origin_url: origin_url = request.GET.get("origin") snapshot_id = request.GET.get("snapshot") or request.GET.get("snapshot_id") path = request.GET.get("path") content_data = {} error_info = {"status_code": 200, "description": None} try: content_data = request_content(query_string) except NotFoundExc as e: error_info["status_code"] = 404 error_info["description"] = f"NotFoundExc: {str(e)}" snapshot_context = None if origin_url is not None or snapshot_id is not None: try: snapshot_context = get_snapshot_context( origin_url=origin_url, snapshot_id=snapshot_id, timestamp=request.GET.get("timestamp"), visit_id=request.GET.get("visit_id"), branch_name=request.GET.get("branch"), release_name=request.GET.get("release"), revision_id=request.GET.get("revision"), path=path, browse_context="content", ) except NotFoundExc as e: if str(e).startswith("Origin"): raw_cnt_url = reverse( "browse-content", url_args={"query_string": query_string} ) error_message = ( "The Software Heritage archive has a content " "with the hash you provided but the origin " "mentioned in your request appears broken: %s. " "Please check the URL and try again.\n\n" "Nevertheless, you can still browse the content " "without origin information: %s" % (gen_link(origin_url), gen_link(raw_cnt_url)) ) raise NotFoundExc(error_message) else: raise e content = None language = None mimetype = None if content_data.get("raw_data") is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and "text/" in mimetype: available_languages = highlightjs.get_supported_languages() filename = None path_info = None directory_id = None root_dir = None if snapshot_context: root_dir = snapshot_context.get("root_directory") query_params = snapshot_context["query_params"] if snapshot_context else {} breadcrumbs = [] if path: split_path = path.split("/") root_dir = root_dir or split_path[0] filename = split_path[-1] if root_dir != path: path = path.replace(root_dir + "/", "") path = path[: -len(filename)] path_info = gen_path_info(path) query_params.pop("path", None) dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": root_dir[:7], "url": dir_url}) for pi in path_info: query_params["path"] = pi["path"] dir_url = reverse( "browse-directory", url_args={"sha1_git": root_dir}, query_params=query_params, ) breadcrumbs.append({"name": pi["name"], "url": dir_url}) breadcrumbs.append({"name": filename, "url": None}) if path and root_dir != path: dir_info = archive.lookup_directory_with_path(root_dir, path) directory_id = dir_info["target"] elif root_dir != path: directory_id = root_dir else: root_dir = None query_params = {"filename": filename} content_checksums = content_data.get("checksums", {}) content_url = reverse("browse-content", url_args={"query_string": query_string},) content_raw_url = reverse( "browse-content-raw", url_args={"query_string": query_string}, query_params=query_params, ) content_metadata = ContentMetadata( object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git"), sha1=content_checksums.get("sha1"), sha1_git=content_checksums.get("sha1_git"), sha256=content_checksums.get("sha256"), blake2s256=content_checksums.get("blake2s256"), content_url=content_url, mimetype=content_data.get("mimetype"), encoding=content_data.get("encoding"), size=content_data.get("length", 0), language=content_data.get("language"), root_directory=root_dir, path=f"/{path}" if path else None, filename=filename or "", directory=directory_id, revision=None, release=None, snapshot=None, origin_url=origin_url, ) swh_objects = [] if content_checksums: swh_objects.append( SWHObjectInfo( object_type=ObjectType.CONTENT, object_id=content_checksums.get("sha1_git"), ) ) if directory_id: swh_objects.append( SWHObjectInfo(object_type=ObjectType.DIRECTORY, object_id=directory_id) ) if snapshot_context: if snapshot_context["revision_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=snapshot_context["revision_id"], ) ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.SNAPSHOT, object_id=snapshot_context["snapshot_id"], ) ) if snapshot_context["release_id"]: swh_objects.append( SWHObjectInfo( object_type=ObjectType.RELEASE, object_id=snapshot_context["release_id"], ) ) swhids_info = get_swhids_info( swh_objects, snapshot_context, extra_context=content_metadata, ) heading = "Content - %s" % content_checksums.get("sha1_git") if breadcrumbs: content_path = "/".join([bc["name"] for bc in breadcrumbs]) heading += " - %s" % content_path return render( request, "browse/content.html", { "heading": heading, "swh_object_id": swhids_info[0]["swhid"] if swhids_info else "", "swh_object_name": "Content", "swh_object_metadata": content_metadata, "content": content, "content_size": content_data.get("length"), "max_content_size": content_display_max_size, "filename": filename, "encoding": content_data.get("encoding"), "mimetype": mimetype, "language": language, "available_languages": available_languages, "breadcrumbs": breadcrumbs, "top_right_link": { "url": content_raw_url, "icon": swh_object_icons["content"], "text": "Raw File", }, "snapshot_context": snapshot_context, "vault_cooking": None, "show_actions": True, "swhids_info": swhids_info, "error_code": error_info["status_code"], "error_message": http_status_code_message.get(error_info["status_code"]), "error_description": error_info["description"], }, status=error_info["status_code"], ) diff --git a/swh/web/misc/fundraising.py b/swh/web/misc/fundraising.py index 03c0683b..f35b0c3b 100644 --- a/swh/web/misc/fundraising.py +++ b/swh/web/misc/fundraising.py @@ -1,62 +1,62 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import requests from django.conf.urls import url from django.shortcuts import render from django.views.decorators.clickjacking import xframe_options_exempt from swh.web.config import get_config @xframe_options_exempt def fundraising_banner(request): config = get_config() public_key = config["give"]["public_key"] token = config["give"]["token"] give_api_forms_url = ( "https://www.softwareheritage.org/give-api/v1/forms/" f"?key={public_key}&token={token}&form=27047" ) donations_goal = 100 nb_donations = -1 try: fundraising_form = requests.get(give_api_forms_url).json().get("forms", []) if fundraising_form: nb_donations = int( fundraising_form[0] .get("stats", {}) .get("total", {}) .get("donations", -1) ) except Exception: pass goal_percent = int(nb_donations / donations_goal * 100) lang = request.GET.get("lang") return render( request, "misc/fundraising-banner.html", { "nb_donations": nb_donations, "donations_goal": donations_goal, "goal_percent": goal_percent, "lang": lang if lang else "en", "donation_form_url": ( "https://www.softwareheritage.org/donations/" "help-preserve-sourcecode-2021/" ), }, ) urlpatterns = [ - url(r"^fundraising/banner", fundraising_banner, name="swh-fundraising-banner"), + url(r"^fundraising/banner/$", fundraising_banner, name="swh-fundraising-banner"), ] diff --git a/swh/web/misc/iframe.py b/swh/web/misc/iframe.py index 31e0b744..df9c81bf 100644 --- a/swh/web/misc/iframe.py +++ b/swh/web/misc/iframe.py @@ -1,337 +1,337 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple from django.conf.urls import url from django.shortcuts import render from django.views.decorators.clickjacking import xframe_options_exempt from swh.model.hashutil import hash_to_bytes from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID from swh.web.browse.snapshot_context import get_snapshot_context from swh.web.browse.utils import ( content_display_max_size, get_directory_entries, prepare_content_for_display, request_content, ) from swh.web.common import archive from swh.web.common.exc import BadInputExc, NotFoundExc, http_status_code_message from swh.web.common.identifiers import get_swhid, get_swhids_info from swh.web.common.typing import SnapshotContext, SWHObjectInfo from swh.web.common.utils import gen_path_info, reverse def _get_content_rendering_data(cnt_swhid: QualifiedSWHID, path: str) -> Dict[str, Any]: content_data = request_content(f"sha1_git:{cnt_swhid.object_id.hex()}") content = None language = None mimetype = None if content_data.get("raw_data") is not None: content_display_data = prepare_content_for_display( content_data["raw_data"], content_data["mimetype"], path ) content = content_display_data["content_data"] language = content_display_data["language"] mimetype = content_display_data["mimetype"] return { "content": content, "content_size": content_data.get("length"), "max_content_size": content_display_max_size, "filename": path.split("/")[-1], "encoding": content_data.get("encoding"), "mimetype": mimetype, "language": language, } def _get_directory_rendering_data( dir_swhid: QualifiedSWHID, focus_swhid: QualifiedSWHID, path: str, ) -> Dict[str, Any]: dirs, files = get_directory_entries(dir_swhid.object_id.hex()) for d in dirs: if d["type"] == "rev": d["url"] = None else: dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(d["target"]), origin=dir_swhid.origin, visit=dir_swhid.visit, anchor=dir_swhid.anchor, path=(path or "/") + d["name"] + "/", ) d["url"] = reverse( "swhid-iframe", url_args={"swhid": str(dir_swhid)}, query_params={"focus_swhid": str(focus_swhid)}, ) for f in files: object_id = hash_to_bytes(f["target"]) cnt_swhid = QualifiedSWHID( object_type=ObjectType.CONTENT, object_id=object_id, origin=dir_swhid.origin, visit=dir_swhid.visit, anchor=dir_swhid.anchor, path=(path or "/") + f["name"], lines=(focus_swhid.lines if object_id == focus_swhid.object_id else None), ) f["url"] = reverse( "swhid-iframe", url_args={"swhid": str(cnt_swhid)}, query_params={"focus_swhid": str(focus_swhid)}, ) return {"dirs": dirs, "files": files} def _get_breacrumbs_data( swhid: QualifiedSWHID, focus_swhid: QualifiedSWHID, path: str, snapshot_context: Optional[SnapshotContext] = None, ) -> Tuple[List[Dict[str, Any]], Optional[str]]: breadcrumbs = [] filename = None # strip any leading or trailing slash from path qualifier of SWHID if path and path[0] == "/": path = path[1:] if path and path[-1] == "/": path = path[:-1] if swhid.object_type == ObjectType.CONTENT: split_path = path.split("/") filename = split_path[-1] path = path[: -len(filename)] path_info = gen_path_info(path) if path != "/" else [] root_dir = None if snapshot_context and snapshot_context["root_directory"]: root_dir = snapshot_context["root_directory"] elif swhid.anchor and swhid.anchor.object_type == ObjectType.DIRECTORY: root_dir = swhid.anchor.object_id.hex() elif focus_swhid.object_type == ObjectType.DIRECTORY: root_dir = focus_swhid.object_id.hex() if root_dir: root_dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(root_dir), origin=swhid.origin, visit=swhid.visit, anchor=swhid.anchor, ) breadcrumbs.append( { "name": root_dir[:7], "object_id": root_dir_swhid.object_id.hex(), "path": "/", "url": reverse( "swhid-iframe", url_args={"swhid": str(root_dir_swhid)}, query_params={ "focus_swhid": focus_swhid if focus_swhid != root_dir_swhid else None }, ), } ) for pi in path_info: dir_info = archive.lookup_directory_with_path(root_dir, pi["path"]) dir_swhid = QualifiedSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(dir_info["target"]), origin=swhid.origin, visit=swhid.visit, anchor=swhid.anchor, path="/" + pi["path"] + "/", ) breadcrumbs.append( { "name": pi["name"], "object_id": dir_swhid.object_id.hex(), "path": dir_swhid.path.decode("utf-8") if dir_swhid.path else "", "url": reverse( "swhid-iframe", url_args={"swhid": str(dir_swhid)}, query_params={"focus_swhid": focus_swhid}, ), } ) if filename: breadcrumbs.append( { "name": filename, "object_id": swhid.object_id.hex(), "path": path, "url": "", } ) return breadcrumbs, root_dir @xframe_options_exempt def swhid_iframe(request, swhid: str): """Django view that can be embedded in an iframe to display objects archived by Software Heritage (currently contents and directories) in a minimalist Web UI. """ focus_swhid = request.GET.get("focus_swhid", swhid) parsed_swhid = None view_data = {} breadcrumbs: List[Dict[str, Any]] = [] swh_objects = [] snapshot_context = None swhids_info_extra_context = {} archive_link = None try: parsed_swhid = get_swhid(swhid) parsed_focus_swhid = get_swhid(focus_swhid) path = parsed_swhid.path.decode("utf-8") if parsed_swhid.path else "" snapshot_context = None revision_id = None if ( parsed_swhid.anchor and parsed_swhid.anchor.object_type == ObjectType.REVISION ): revision_id = parsed_swhid.anchor.object_id.hex() if parsed_swhid.origin or parsed_swhid.visit: snapshot_context = get_snapshot_context( origin_url=parsed_swhid.origin, snapshot_id=parsed_swhid.visit.object_id.hex() if parsed_swhid.visit else None, revision_id=revision_id, ) error_info: Dict[str, Any] = {"status_code": 200, "description": ""} if parsed_swhid and parsed_swhid.object_type == ObjectType.CONTENT: view_data = _get_content_rendering_data(parsed_swhid, path) swh_objects.append( SWHObjectInfo( object_type=ObjectType.CONTENT, object_id=parsed_swhid.object_id.hex(), ) ) elif parsed_swhid and parsed_swhid.object_type == ObjectType.DIRECTORY: view_data = _get_directory_rendering_data( parsed_swhid, parsed_focus_swhid, path ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.DIRECTORY, object_id=parsed_swhid.object_id.hex(), ) ) elif parsed_swhid: error_info = { "status_code": 400, "description": ( f"Objects of type {parsed_swhid.object_type} are not supported" ), } swhids_info_extra_context["path"] = path if parsed_swhid and view_data: breadcrumbs, root_dir = _get_breacrumbs_data( parsed_swhid, parsed_focus_swhid, path, snapshot_context ) if parsed_swhid.object_type == ObjectType.CONTENT and len(breadcrumbs) > 1: swh_objects.append( SWHObjectInfo( object_type=ObjectType.DIRECTORY, object_id=breadcrumbs[-2]["object_id"], ) ) swhids_info_extra_context["path"] = breadcrumbs[-2]["path"] swhids_info_extra_context["filename"] = breadcrumbs[-1]["name"] if snapshot_context: swh_objects.append( SWHObjectInfo( object_type=ObjectType.REVISION, object_id=snapshot_context["revision_id"] or "", ) ) swh_objects.append( SWHObjectInfo( object_type=ObjectType.SNAPSHOT, object_id=snapshot_context["snapshot_id"] or "", ) ) archive_link = reverse("browse-swhid", url_args={"swhid": swhid}) if ( parsed_swhid.origin is None and parsed_swhid.visit is None and parsed_swhid.anchor is None and root_dir is not None ): # qualifier values cannot be used to get root directory from them, # we need to add it as anchor in the SWHID argument of the archive link root_dir_swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(root_dir) ) archive_swhid = QualifiedSWHID( object_type=parsed_swhid.object_type, object_id=parsed_swhid.object_id, path=parsed_swhid.path, anchor=root_dir_swhid, ) archive_link = reverse( "browse-swhid", url_args={"swhid": f"{archive_swhid}"}, ) except BadInputExc as e: error_info = {"status_code": 400, "description": f"BadInputExc: {str(e)}"} except NotFoundExc as e: error_info = {"status_code": 404, "description": f"NotFoundExc: {str(e)}"} except Exception as e: error_info = {"status_code": 500, "description": str(e)} return render( request, "misc/iframe.html", { **view_data, "iframe_mode": True, "object_type": parsed_swhid.object_type.value if parsed_swhid else None, "lines": parsed_swhid.lines if parsed_swhid else None, "breadcrumbs": breadcrumbs, "swhid": swhid, "focus_swhid": focus_swhid, "archive_link": archive_link, "error_code": error_info["status_code"], "error_message": http_status_code_message.get(error_info["status_code"]), "error_description": error_info["description"], "snapshot_context": None, "swhids_info": get_swhids_info( swh_objects, snapshot_context, swhids_info_extra_context ), }, status=error_info["status_code"], ) urlpatterns = [ url( - r"^embed/(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)$", + r"^embed/(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$", swhid_iframe, name="swhid-iframe", ), ] diff --git a/swh/web/misc/origin_save.py b/swh/web/misc/origin_save.py index db9f05f2..e66f00d2 100644 --- a/swh/web/misc/origin_save.py +++ b/swh/web/misc/origin_save.py @@ -1,99 +1,99 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.conf.urls import url from django.core.paginator import Paginator from django.db.models import Q from django.http import JsonResponse from django.shortcuts import render from swh.web.auth.utils import SWH_AMBASSADOR_PERMISSION, privileged_user from swh.web.common.models import SaveOriginRequest from swh.web.common.origin_save import ( get_savable_visit_types, get_save_origin_task_info, ) def _origin_save_view(request): return render( request, "misc/origin-save.html", { "heading": ("Request the saving of a software origin into the archive"), "visit_types": get_savable_visit_types( privileged_user(request, permissions=[SWH_AMBASSADOR_PERMISSION]) ), }, ) def _origin_save_requests_list(request, status): if status != "all": save_requests = SaveOriginRequest.objects.filter(status=status) else: save_requests = SaveOriginRequest.objects.all() table_data = {} table_data["recordsTotal"] = save_requests.count() table_data["draw"] = int(request.GET["draw"]) search_value = request.GET["search[value]"] column_order = request.GET["order[0][column]"] field_order = request.GET["columns[%s][name]" % column_order] order_dir = request.GET["order[0][dir]"] if order_dir == "desc": field_order = "-" + field_order save_requests = save_requests.order_by(field_order) length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 if search_value: save_requests = save_requests.filter( Q(status__icontains=search_value) | Q(loading_task_status__icontains=search_value) | Q(visit_type__icontains=search_value) | Q(origin_url__icontains=search_value) ) if ( int(request.GET.get("user_requests_only", "0")) and request.user.is_authenticated ): save_requests = save_requests.filter(user_ids__contains=f'"{request.user.id}"') table_data["recordsFiltered"] = save_requests.count() paginator = Paginator(save_requests, length) table_data["data"] = [sor.to_dict() for sor in paginator.page(page).object_list] return JsonResponse(table_data) def _save_origin_task_info(request, save_request_id): request_info = get_save_origin_task_info( save_request_id, full_info=request.user.is_staff ) for date_field in ("scheduled", "started", "ended"): if date_field in request_info and request_info[date_field] is not None: request_info[date_field] = request_info[date_field].isoformat() return JsonResponse(request_info) urlpatterns = [ url(r"^save/$", _origin_save_view, name="origin-save"), url( r"^save/requests/list/(?P.+)/$", _origin_save_requests_list, name="origin-save-requests-list", ), url( - r"^save/task/info/(?P.+)/", + r"^save/task/info/(?P.+)/$", _save_origin_task_info, name="origin-save-task-info", ), ] diff --git a/swh/web/misc/urls.py b/swh/web/misc/urls.py index b30ae214..8f81f17a 100644 --- a/swh/web/misc/urls.py +++ b/swh/web/misc/urls.py @@ -1,104 +1,104 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import requests import sentry_sdk from django.conf.urls import include, url from django.contrib.staticfiles import finders from django.http import JsonResponse from django.shortcuts import render from swh.web.common import archive from swh.web.config import get_config from swh.web.misc.metrics import prometheus_metrics def _jslicenses(request): jslicenses_file = finders.find("jssources/jslicenses.json") jslicenses_data = json.load(open(jslicenses_file)) jslicenses_data = sorted( jslicenses_data.items(), key=lambda item: item[0].split("/")[-1] ) return render(request, "misc/jslicenses.html", {"jslicenses_data": jslicenses_data}) def _stat_counters(request): stat_counters = archive.stat_counters() url = get_config()["history_counters_url"] stat_counters_history = {} try: response = requests.get(url, timeout=5) stat_counters_history = json.loads(response.text) except Exception as exc: sentry_sdk.capture_exception(exc) counters = { "stat_counters": stat_counters, "stat_counters_history": stat_counters_history, } return JsonResponse(counters) urlpatterns = [ url(r"^", include("swh.web.misc.coverage")), url(r"^jslicenses/$", _jslicenses, name="jslicenses"), url(r"^", include("swh.web.misc.origin_save")), - url(r"^stat_counters/", _stat_counters, name="stat-counters"), + url(r"^stat_counters/$", _stat_counters, name="stat-counters"), url(r"^", include("swh.web.misc.badges")), url(r"^metrics/prometheus/$", prometheus_metrics, name="metrics-prometheus"), url(r"^", include("swh.web.misc.iframe")), url(r"^", include("swh.web.misc.fundraising")), ] # when running end to end tests through cypress, declare some extra # endpoints to provide input data for some of those tests if get_config()["e2e_tests_mode"]: from swh.web.tests.views import ( get_content_code_data_all_exts, get_content_code_data_all_filenames, get_content_code_data_by_ext, get_content_code_data_by_filename, get_content_other_data_by_ext, ) urlpatterns.append( url( r"^tests/data/content/code/extension/(?P.+)/$", get_content_code_data_by_ext, name="tests-content-code-extension", ) ) urlpatterns.append( url( r"^tests/data/content/other/extension/(?P.+)/$", get_content_other_data_by_ext, name="tests-content-other-extension", ) ) urlpatterns.append( url( r"^tests/data/content/code/extensions/$", get_content_code_data_all_exts, name="tests-content-code-extensions", ) ) urlpatterns.append( url( r"^tests/data/content/code/filename/(?P.+)/$", get_content_code_data_by_filename, name="tests-content-code-filename", ) ) urlpatterns.append( url( r"^tests/data/content/code/filenames/$", get_content_code_data_all_filenames, name="tests-content-code-filenames", ) ) diff --git a/swh/web/tests/api/test_throttling.py b/swh/web/tests/api/test_throttling.py index 82ffa6a4..40ec2c58 100644 --- a/swh/web/tests/api/test_throttling.py +++ b/swh/web/tests/api/test_throttling.py @@ -1,230 +1,230 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.conf.urls import url from django.test.utils import override_settings from rest_framework.decorators import api_view from rest_framework.response import Response from rest_framework.views import APIView from swh.web.api.throttling import ( API_THROTTLING_EXEMPTED_PERM, SwhWebRateThrottle, SwhWebUserRateThrottle, throttle_scope, ) from swh.web.settings.tests import ( scope1_limiter_rate, scope1_limiter_rate_post, scope2_limiter_rate, scope2_limiter_rate_post, scope3_limiter_rate, scope3_limiter_rate_post, ) from swh.web.tests.utils import create_django_permission from swh.web.urls import urlpatterns class MockViewScope1(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope1" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope2") def mock_view_scope2(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") class MockViewScope3(APIView): throttle_classes = (SwhWebRateThrottle,) throttle_scope = "scope3" def get(self, request): return Response("foo_get") def post(self, request): return Response("foo_post") @api_view(["GET", "POST"]) @throttle_scope("scope3") def mock_view_scope3(request): if request.method == "GET": return Response("bar_get") elif request.method == "POST": return Response("bar_post") urlpatterns += [ - url(r"^scope1_class$", MockViewScope1.as_view()), - url(r"^scope2_func$", mock_view_scope2), - url(r"^scope3_class$", MockViewScope3.as_view()), - url(r"^scope3_func$", mock_view_scope3), + url(r"^scope1_class/$", MockViewScope1.as_view()), + url(r"^scope2_func/$", mock_view_scope2), + url(r"^scope3_class/$", MockViewScope3.as_view()), + url(r"^scope3_func/$", mock_view_scope3), ] def check_response(response, status_code, limit=None, remaining=None): assert response.status_code == status_code if limit is not None: assert response["X-RateLimit-Limit"] == str(limit) else: assert "X-RateLimit-Limit" not in response if remaining is not None: assert response["X-RateLimit-Remaining"] == str(remaining) else: assert "X-RateLimit-Remaining" not in response @override_settings(ROOT_URLCONF=__name__) def test_scope1_requests_are_throttled(api_client): """ Ensure request rate is limited in scope1 """ for i in range(scope1_limiter_rate): - response = api_client.get("/scope1_class") + response = api_client.get("/scope1_class/") check_response(response, 200, scope1_limiter_rate, scope1_limiter_rate - i - 1) - response = api_client.get("/scope1_class") + response = api_client.get("/scope1_class/") check_response(response, 429, scope1_limiter_rate, 0) for i in range(scope1_limiter_rate_post): - response = api_client.post("/scope1_class") + response = api_client.post("/scope1_class/") check_response( response, 200, scope1_limiter_rate_post, scope1_limiter_rate_post - i - 1 ) - response = api_client.post("/scope1_class") + response = api_client.post("/scope1_class/") check_response(response, 429, scope1_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope2_requests_are_throttled(api_client): """ Ensure request rate is limited in scope2 """ for i in range(scope2_limiter_rate): - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response(response, 200, scope2_limiter_rate, scope2_limiter_rate - i - 1) - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response(response, 429, scope2_limiter_rate, 0) for i in range(scope2_limiter_rate_post): - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response( response, 200, scope2_limiter_rate_post, scope2_limiter_rate_post - i - 1 ) - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response(response, 429, scope2_limiter_rate_post, 0) @override_settings(ROOT_URLCONF=__name__) def test_scope3_requests_are_throttled_exempted(api_client): """ Ensure request rate is not limited in scope3 as requests coming from localhost are exempted from rate limit. """ for _ in range(scope3_limiter_rate + 1): - response = api_client.get("/scope3_class") + response = api_client.get("/scope3_class/") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): - response = api_client.post("/scope3_class") + response = api_client.post("/scope3_class/") check_response(response, 200) for _ in range(scope3_limiter_rate + 1): - response = api_client.get("/scope3_func") + response = api_client.get("/scope3_func/") check_response(response, 200) for _ in range(scope3_limiter_rate_post + 1): - response = api_client.post("/scope3_func") + response = api_client.post("/scope3_func/") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_staff_users_are_not_rate_limited(api_client, staff_user): api_client.force_login(staff_user) for _ in range(scope2_limiter_rate + 1): - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response(response, 200) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_non_staff_users_are_rate_limited(api_client, regular_user): api_client.force_login(regular_user) scope2_limiter_rate_user = ( scope2_limiter_rate * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_user): - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response( response, 200, scope2_limiter_rate_user, scope2_limiter_rate_user - i - 1 ) - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response(response, 429, scope2_limiter_rate_user, 0) scope2_limiter_rate_post_user = ( scope2_limiter_rate_post * SwhWebUserRateThrottle.NUM_REQUESTS_FACTOR ) for i in range(scope2_limiter_rate_post_user): - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response( response, 200, scope2_limiter_rate_post_user, scope2_limiter_rate_post_user - i - 1, ) - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response(response, 429, scope2_limiter_rate_post_user, 0) @override_settings(ROOT_URLCONF=__name__) @pytest.mark.django_db def test_users_with_throttling_exempted_perm_are_not_rate_limited( api_client, regular_user ): regular_user.user_permissions.add( create_django_permission(API_THROTTLING_EXEMPTED_PERM) ) assert regular_user.has_perm(API_THROTTLING_EXEMPTED_PERM) api_client.force_login(regular_user) for _ in range(scope2_limiter_rate + 1): - response = api_client.get("/scope2_func") + response = api_client.get("/scope2_func/") check_response(response, 200) for _ in range(scope2_limiter_rate_post + 1): - response = api_client.post("/scope2_func") + response = api_client.post("/scope2_func/") check_response(response, 200) diff --git a/swh/web/tests/test_urls.py b/swh/web/tests/test_urls.py new file mode 100644 index 00000000..f7e664b2 --- /dev/null +++ b/swh/web/tests/test_urls.py @@ -0,0 +1,13 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.urls import get_resolver + + +def test_swh_web_urls_have_trailing_slash(): + urls = set(value[1] for value in get_resolver().reverse_dict.values()) + for url in urls: + if url != "$": + assert url.endswith("/$") diff --git a/swh/web/urls.py b/swh/web/urls.py index 705223b5..dde8db87 100644 --- a/swh/web/urls.py +++ b/swh/web/urls.py @@ -1,86 +1,86 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django_js_reverse.views import urls_js from django.conf import settings from django.conf.urls import ( handler400, handler403, handler404, handler500, include, url, ) from django.contrib.auth.views import LogoutView from django.contrib.staticfiles.views import serve from django.shortcuts import render from django.views.generic.base import RedirectView from swh.web.browse.identifiers import swhid_browse from swh.web.common.exc import ( swh_handle400, swh_handle403, swh_handle404, swh_handle500, ) from swh.web.common.utils import origin_visit_types from swh.web.config import get_config, is_feature_enabled swh_web_config = get_config() favicon_view = RedirectView.as_view( url="/static/img/icons/swh-logo-32x32.png", permanent=True ) def _default_view(request): return render(request, "homepage.html", {"visit_types": origin_visit_types()}) urlpatterns = [ url(r"^admin/", include("swh.web.admin.urls")), - url(r"^favicon\.ico$", favicon_view), + url(r"^favicon\.ico/$", favicon_view), url(r"^api/", include("swh.web.api.urls")), url(r"^browse/", include("swh.web.browse.urls")), url(r"^$", _default_view, name="swh-web-homepage"), url(r"^jsreverse/$", urls_js, name="js_reverse"), # keep legacy SWHID resolving URL with trailing slash for backward compatibility url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)/$", swhid_browse, name="browse-swhid-legacy", ), url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)$", swhid_browse, name="browse-swhid", ), url(r"^", include("swh.web.misc.urls")), url(r"^", include("swh.web.auth.views")), url(r"^logout/$", LogoutView.as_view(template_name="logout.html"), name="logout"), ] if is_feature_enabled("add_forge_now"): urlpatterns += (url(r"^", include("swh.web.add_forge_now.views")),) # allow to serve assets through django staticfiles # even if settings.DEBUG is False def insecure_serve(request, path, **kwargs): return serve(request, path, insecure=True, **kwargs) # enable to serve compressed assets through django development server if swh_web_config["serve_assets"]: - static_pattern = r"^%s(?P.*)$" % settings.STATIC_URL[1:] + static_pattern = r"^%s(?P.*)/$" % settings.STATIC_URL[1:] urlpatterns.append(url(static_pattern, insecure_serve)) handler400 = swh_handle400 # noqa handler403 = swh_handle403 # noqa handler404 = swh_handle404 # noqa handler500 = swh_handle500 # noqa