diff --git a/swh/web/add_forge_now/views.py b/swh/web/add_forge_now/views.py new file mode 100644 index 00000000..38a02691 --- /dev/null +++ b/swh/web/add_forge_now/views.py @@ -0,0 +1,87 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict + +from django.conf.urls import url +from django.core.paginator import Paginator +from django.db.models import Q +from django.http.request import HttpRequest +from django.http.response import HttpResponse, JsonResponse + +from swh.web.api.views.add_forge_now import ( + AddForgeNowRequestPublicSerializer, + AddForgeNowRequestSerializer, +) +from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION + +from .models import Request as AddForgeRequest + + +def add_forge_request_list_datatables(request: HttpRequest) -> HttpResponse: + """Dedicated endpoint used by datatables to display the add-forge + requests in the Web UI. + """ + + draw = int(request.GET.get("draw", 0)) + + add_forge_requests = AddForgeRequest.objects.all() + + table_data: Dict[str, Any] = { + "recordsTotal": add_forge_requests.count(), + "draw": draw, + } + + search_value = request.GET.get("search[value]") + + column_order = request.GET.get("order[0][column]") + field_order = request.GET.get(f"columns[{column_order}][name]", "id") + order_dir = request.GET.get("order[0][dir]", "desc") + + if field_order: + if order_dir == "desc": + field_order = "-" + field_order + add_forge_requests = add_forge_requests.order_by(field_order) + + per_page = int(request.GET.get("length", 10)) + page_num = int(request.GET.get("start", 0)) // per_page + 1 + + if search_value: + add_forge_requests = add_forge_requests.filter( + Q(forge_type__icontains=search_value) + | Q(forge_url__icontains=search_value) + | Q(status__icontains=search_value) + ) + + if ( + int(request.GET.get("user_requests_only", "0")) + and request.user.is_authenticated + ): + add_forge_requests = add_forge_requests.filter( + submitter_name=request.user.username + ) + + paginator = Paginator(add_forge_requests, per_page) + page = paginator.page(page_num) + + if request.user.has_perm(ADD_FORGE_MODERATOR_PERMISSION): + requests = AddForgeNowRequestSerializer(page.object_list, many=True).data + else: + requests = AddForgeNowRequestPublicSerializer(page.object_list, many=True).data + + results = [dict(request) for request in requests] + + table_data["recordsFiltered"] = add_forge_requests.count() + table_data["data"] = results + return JsonResponse(table_data) + + +urlpatterns = [ + url( + r"^add-forge/request/list/datatables$", + add_forge_request_list_datatables, + name="add-forge-request-list-datatables", + ), +] diff --git a/swh/web/api/views/add_forge_now.py b/swh/web/api/views/add_forge_now.py index b83d9572..6c232c94 100644 --- a/swh/web/api/views/add_forge_now.py +++ b/swh/web/api/views/add_forge_now.py @@ -1,399 +1,355 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union from django.core.exceptions import ObjectDoesNotExist from django.core.paginator import Paginator from django.db import transaction -from django.db.models import Q from django.forms import CharField, ModelForm from django.http import HttpResponseBadRequest from django.http.request import HttpRequest from django.http.response import HttpResponse, HttpResponseForbidden from rest_framework import serializers from rest_framework.request import Request from rest_framework.response import Response from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestActorRole as AddForgeNowRequestActorRole from swh.web.add_forge_now.models import RequestHistory as AddForgeNowRequestHistory from swh.web.add_forge_now.models import RequestStatus as AddForgeNowRequestStatus from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route +from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse -MODERATOR_ROLE = "swh.web.add_forge_now.moderator" - def _block_while_testing(): """Replaced by tests to check concurrency behavior """ pass class AddForgeNowRequestForm(ModelForm): class Meta: model = AddForgeRequest fields = ( "forge_type", "forge_url", "forge_contact_email", "forge_contact_name", "forge_contact_comment", ) class AddForgeNowRequestHistoryForm(ModelForm): new_status = CharField(max_length=200, required=False,) class Meta: model = AddForgeNowRequestHistory fields = ("text", "new_status") class AddForgeNowRequestSerializer(serializers.ModelSerializer): class Meta: model = AddForgeRequest fields = "__all__" class AddForgeNowRequestPublicSerializer(serializers.ModelSerializer): """Serializes AddForgeRequest without private fields. """ class Meta: model = AddForgeRequest fields = ("id", "forge_url", "forge_type", "status", "submission_date") class AddForgeNowRequestHistorySerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory exclude = ("request",) class AddForgeNowRequestHistoryPublicSerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory fields = ("id", "date", "new_status", "actor_role") @api_route( r"/add-forge/request/create", "api-1-add-forge-request-create", methods=["POST"], ) @api_doc("/add-forge/request/create") @format_docstring() @transaction.atomic def api_add_forge_request_create(request: Union[HttpRequest, Request]) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/create/ Create a new request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/update/", "api-1-add-forge-request-update", methods=["POST"], ) @api_doc("/add-forge/request/update", tags=["hidden"]) @format_docstring() @transaction.atomic def api_add_forge_request_update( request: Union[HttpRequest, Request], id: int ) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/update/ Update a request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/get", "api-1-add-forge-request-get", methods=["GET"], ) @api_doc("/add-forge/request/get") @format_docstring() def api_add_forge_request_get(request: Request, id: int): """ .. http:get:: /api/1/add-forge/request/get/ Return all details about an add-forge request. {common_headers} :param int id: add-forge request identifier :statuscode 200: request details successfully returned :statuscode 400: request identifier does not exist """ try: add_forge_request = AddForgeRequest.objects.get(id=id) except ObjectDoesNotExist: raise BadInputExc("Request id does not exist") request_history = AddForgeNowRequestHistory.objects.filter( request=add_forge_request ).order_by("id") - if request.user.is_authenticated and request.user.has_perm(MODERATOR_ROLE): + if request.user.is_authenticated and request.user.has_perm( + ADD_FORGE_MODERATOR_PERMISSION + ): data = AddForgeNowRequestSerializer(add_forge_request).data history = AddForgeNowRequestHistorySerializer(request_history, many=True).data else: data = AddForgeNowRequestPublicSerializer(add_forge_request).data history = AddForgeNowRequestHistoryPublicSerializer( request_history, many=True ).data return {"request": data, "history": history} diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py index 341d0f06..c6ddba96 100644 --- a/swh/web/auth/utils.py +++ b/swh/web/auth/utils.py @@ -1,97 +1,98 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from base64 import urlsafe_b64encode from typing import List from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from django.http.request import HttpRequest OIDC_SWH_WEB_CLIENT_ID = "swh-web" SWH_AMBASSADOR_PERMISSION = "swh.ambassador" API_SAVE_ORIGIN_PERMISSION = "swh.web.api.save_origin" ADMIN_LIST_DEPOSIT_PERMISSION = "swh.web.admin.list_deposits" MAILMAP_PERMISSION = "swh.web.mailmap" +ADD_FORGE_MODERATOR_PERMISSION = "swh.web.add_forge_now.moderator" def _get_fernet(password: bytes, salt: bytes) -> Fernet: """ Instantiate a Fernet system from a password and a salt value (see https://cryptography.io/en/latest/fernet/). Args: password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The Fernet system """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend(), ) key = urlsafe_b64encode(kdf.derive(password)) return Fernet(key) def encrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Encrypt data using Fernet system (symmetric encryption). Args: data: input data to encrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The encrypted data """ return _get_fernet(password, salt).encrypt(data) def decrypt_data(data: bytes, password: bytes, salt: bytes) -> bytes: """ Decrypt data using Fernet system (symmetric encryption). Args: data: input data to decrypt password: user password that will be used to generate a Fernet key derivation function salt: value that will be used to generate a Fernet key derivation function Returns: The decrypted data """ return _get_fernet(password, salt).decrypt(data) def privileged_user(request: HttpRequest, permissions: List[str] = []) -> bool: """Determine whether a user is authenticated and is a privileged one (e.g ambassador). This allows such user to have access to some more actions (e.g. bypass save code now review, access to 'archives' type...). A user is considered as privileged if he is a staff member or has any permission from those provided as parameters. Args: request: Input django HTTP request permissions: list of permission names to determine if user is privileged or not Returns: Whether the user is privileged or not. """ user = request.user return user.is_authenticated and ( user.is_staff or any([user.has_perm(perm) for perm in permissions]) ) diff --git a/swh/web/tests/add_forge_now/test_views.py b/swh/web/tests/add_forge_now/test_views.py new file mode 100644 index 00000000..843f5b37 --- /dev/null +++ b/swh/web/tests/add_forge_now/test_views.py @@ -0,0 +1,203 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json + +import pytest + +from swh.web.common.utils import reverse +from swh.web.tests.api.views.test_add_forge_now import create_add_forge_request +from swh.web.tests.utils import check_http_get_response + +NB_FORGE_TYPE = 2 +NB_FORGES_PER_TYPE = 20 + + +def create_add_forge_requests(client, regular_user, regular_user2): + requests = [] + for i in range(NB_FORGES_PER_TYPE): + request = { + "forge_type": "gitlab", + "forge_url": f"https://gitlab.example{i:02d}.org", + "forge_contact_email": f"admin@gitlab.example{i:02d}.org", + "forge_contact_name": f"gitlab.example{i:02d}.org admin", + "forge_contact_comment": "user marked as owner in forge members", + } + create_add_forge_request( + client, regular_user, data=request, + ) + requests.append(request) + + request = { + "forge_type": "gitea", + "forge_url": f"https://gitea.example{i:02d}.org", + "forge_contact_email": f"admin@gitea.example{i:02d}.org", + "forge_contact_name": f"gitea.example{i:02d}.org admin", + "forge_contact_comment": "user marked as owner in forge members", + } + create_add_forge_request( + client, regular_user2, data=request, + ) + requests.append(request) + return requests + + +@pytest.mark.django_db(transaction=True, reset_sequences=True) +def test_add_forge_request_list_datatables_no_parameters( + client, regular_user, regular_user2 +): + create_add_forge_requests(client, regular_user, regular_user2) + + url = reverse("add-forge-request-list-datatables") + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + length = 10 + assert data["draw"] == 0 + assert data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == length + # default ordering is by descending id + assert data["data"][0]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["data"][-1]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - length + 1 + assert "submitter_name" not in data["data"][0] + + +@pytest.mark.django_db(transaction=True, reset_sequences=True) +def test_add_forge_request_list_datatables( + client, regular_user, regular_user2, add_forge_moderator +): + create_add_forge_requests(client, regular_user, regular_user2) + + length = 10 + + url = reverse( + "add-forge-request-list-datatables", + query_params={"draw": 1, "length": length, "start": 0}, + ) + + client.force_login(regular_user) + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + assert data["draw"] == 1 + assert data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == length + # default ordering is by descending id + assert data["data"][0]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["data"][-1]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - length + 1 + assert "submitter_name" not in data["data"][0] + + client.force_login(add_forge_moderator) + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + assert data["draw"] == 1 + assert data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == length + # default ordering is by descending id + assert data["data"][0]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["data"][-1]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - length + 1 + assert "submitter_name" in data["data"][0] + + +@pytest.mark.django_db(transaction=True, reset_sequences=True) +def test_add_forge_request_list_datatables_ordering( + client, regular_user, regular_user2 +): + requests = create_add_forge_requests(client, regular_user, regular_user2) + requests_sorted = list(sorted(requests, key=lambda d: d["forge_url"])) + forge_urls_asc = [request["forge_url"] for request in requests_sorted] + forge_urls_desc = list(reversed(forge_urls_asc)) + + length = 10 + + for direction in ("asc", "desc"): + for i in range(4): + url = reverse( + "add-forge-request-list-datatables", + query_params={ + "draw": 1, + "length": length, + "start": i * length, + "order[0][column]": 2, + "order[0][dir]": direction, + "columns[2][name]": "forge_url", + }, + ) + + client.force_login(regular_user) + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + assert data["draw"] == 1 + assert data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == length + + page_forge_urls = [request["forge_url"] for request in data["data"]] + if direction == "asc": + expected_forge_urls = forge_urls_asc[i * length : (i + 1) * length] + else: + expected_forge_urls = forge_urls_desc[i * length : (i + 1) * length] + assert page_forge_urls == expected_forge_urls + + +@pytest.mark.django_db(transaction=True, reset_sequences=True) +def test_add_forge_request_list_datatables_search(client, regular_user, regular_user2): + create_add_forge_requests(client, regular_user, regular_user2) + + url = reverse( + "add-forge-request-list-datatables", + query_params={ + "draw": 1, + "length": NB_FORGES_PER_TYPE, + "start": 0, + "search[value]": "gitlab", + }, + ) + + client.force_login(regular_user) + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + assert data["draw"] == 1 + assert data["recordsFiltered"] == NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == NB_FORGES_PER_TYPE + + page_forge_type = [request["forge_type"] for request in data["data"]] + assert page_forge_type == ["gitlab"] * NB_FORGES_PER_TYPE + + +@pytest.mark.django_db(transaction=True, reset_sequences=True) +def test_add_forge_request_list_datatables_user_requests( + client, regular_user, regular_user2 +): + create_add_forge_requests(client, regular_user, regular_user2) + + url = reverse( + "add-forge-request-list-datatables", + query_params={ + "draw": 1, + "length": NB_FORGES_PER_TYPE * NB_FORGE_TYPE, + "start": 0, + "user_requests_only": 1, + }, + ) + + client.force_login(regular_user2) + resp = check_http_get_response(client, url, status_code=200) + data = json.loads(resp.content) + + assert data["draw"] == 1 + assert data["recordsFiltered"] == NB_FORGES_PER_TYPE + assert data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE + assert len(data["data"]) == NB_FORGES_PER_TYPE + + page_forge_type = [request["forge_type"] for request in data["data"]] + assert page_forge_type == ["gitea"] * NB_FORGES_PER_TYPE diff --git a/swh/web/tests/api/views/test_add_forge_now.py b/swh/web/tests/api/views/test_add_forge_now.py index d47ba548..6b148852 100644 --- a/swh/web/tests/api/views/test_add_forge_now.py +++ b/swh/web/tests/api/views/test_add_forge_now.py @@ -1,633 +1,483 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import threading import time from urllib.parse import urlencode import iso8601 import pytest from swh.web.add_forge_now.models import Request -from swh.web.api.views.add_forge_now import MODERATOR_ROLE from swh.web.common.utils import reverse from swh.web.tests.utils import ( check_api_get_responses, check_api_post_response, check_http_post_response, - create_django_permission, ) @pytest.mark.django_db def test_add_forge_request_create_anonymous_user(api_client): url = reverse("api-1-add-forge-request-create") check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db def test_add_forge_request_create_empty(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") resp = check_api_post_response(api_client, url, status_code=400) assert '"forge_type"' in resp.data["reason"] ADD_FORGE_DATA = { "forge_type": "gitlab", "forge_url": "https://gitlab.example.org", "forge_contact_email": "admin@gitlab.example.org", "forge_contact_name": "gitlab.example.org admin", "forge_contact_comment": "user marked as owner in forge members", } ADD_OTHER_FORGE_DATA = { "forge_type": "gitea", "forge_url": "https://gitea.example.org", "forge_contact_email": "admin@gitea.example.org", "forge_contact_name": "gitea.example.org admin", "forge_contact_comment": "user marked as owner in forge members", } @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_create_success(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") date_before = datetime.datetime.now(tz=datetime.timezone.utc) resp = check_api_post_response( api_client, url, data=ADD_FORGE_DATA, status_code=201, ) date_after = datetime.datetime.now(tz=datetime.timezone.utc) assert resp.data == { **ADD_FORGE_DATA, "id": 1, "status": "PENDING", "submission_date": resp.data["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, } assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after request = Request.objects.all()[0] assert request.forge_url == ADD_FORGE_DATA["forge_url"] assert request.submitter_name == regular_user.username @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_create_success_form_encoded(client, regular_user): client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") date_before = datetime.datetime.now(tz=datetime.timezone.utc) resp = check_http_post_response( client, url, request_content_type="application/x-www-form-urlencoded", data=urlencode(ADD_FORGE_DATA), status_code=201, ) date_after = datetime.datetime.now(tz=datetime.timezone.utc) assert resp.data == { **ADD_FORGE_DATA, "id": 1, "status": "PENDING", "submission_date": resp.data["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, } assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after request = Request.objects.all()[0] assert request.forge_url == ADD_FORGE_DATA["forge_url"] assert request.submitter_name == regular_user.username @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_create_duplicate(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") check_api_post_response( api_client, url, data=ADD_FORGE_DATA, status_code=201, ) check_api_post_response( api_client, url, data=ADD_FORGE_DATA, status_code=409, ) requests = Request.objects.all() assert len(requests) == 1 -@pytest.fixture -def moderator_user(regular_user2): - regular_user2.user_permissions.add(create_django_permission(MODERATOR_ROLE)) - return regular_user2 - - @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_anonymous_user(api_client): url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_regular_user(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_update_non_existent(api_client, moderator_user): - api_client.force_login(moderator_user) +def test_add_forge_request_update_non_existent(api_client, add_forge_moderator): + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=400) -def _create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA): +def create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") return check_api_post_response(api_client, url, data=data, status_code=201,) @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_update_empty(api_client, regular_user, moderator_user): - _create_add_forge_request(api_client, regular_user) +def test_add_forge_request_update_empty(api_client, regular_user, add_forge_moderator): + create_add_forge_request(api_client, regular_user) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=400) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_missing_field( - api_client, regular_user, moderator_user + api_client, regular_user, add_forge_moderator ): - _create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, data={}, status_code=400) check_api_post_response( api_client, url, data={"new_status": "REJECTED"}, status_code=400 ) @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_update(api_client, regular_user, moderator_user): - _create_add_forge_request(api_client, regular_user) +def test_add_forge_request_update(api_client, regular_user, add_forge_moderator): + create_add_forge_request(api_client, regular_user) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response( api_client, url, data={"text": "updating request"}, status_code=200 ) check_api_post_response( api_client, url, data={"new_status": "REJECTED", "text": "request rejected"}, status_code=200, ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_invalid_new_status( - api_client, regular_user, moderator_user + api_client, regular_user, add_forge_moderator ): - _create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response( api_client, url, data={"new_status": "ACCEPTED", "text": "request accepted"}, status_code=400, ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_status_concurrent( - api_client, regular_user, moderator_user, mocker + api_client, regular_user, add_forge_moderator, mocker ): _block_while_testing = mocker.patch( "swh.web.api.views.add_forge_now._block_while_testing" ) _block_while_testing.side_effect = lambda: time.sleep(1) - _create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) worker_ended = False def worker(): nonlocal worker_ended check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) worker_ended = True # this thread will first modify the request status to WAITING_FOR_FEEDBACK thread = threading.Thread(target=worker) thread.start() # the other thread (slower) will attempt to modify the request status to REJECTED # but it will not be allowed as the first faster thread already modified it # and REJECTED state can not be reached from WAITING_FOR_FEEDBACK one time.sleep(0.5) check_api_post_response( api_client, url, data={"new_status": "REJECTED", "text": "request accepted"}, status_code=400, ) thread.join() assert worker_ended @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_anonymous(api_client, regular_user): url = reverse("api-1-add-forge-request-list") resp = check_api_get_responses(api_client, url, status_code=200) assert resp.data == [] - _create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user) resp = check_api_get_responses(api_client, url, status_code=200) add_forge_request = { "forge_url": ADD_FORGE_DATA["forge_url"], "forge_type": ADD_FORGE_DATA["forge_type"], "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "id": 1, } assert resp.data == [add_forge_request] - _create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) + create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) resp = check_api_get_responses(api_client, url, status_code=200) other_forge_request = { "forge_url": ADD_OTHER_FORGE_DATA["forge_url"], "forge_type": ADD_OTHER_FORGE_DATA["forge_type"], "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "id": 2, } assert resp.data == [other_forge_request, add_forge_request] @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_list_moderator(api_client, regular_user, moderator_user): +def test_add_forge_request_list_moderator( + api_client, regular_user, add_forge_moderator +): url = reverse("api-1-add-forge-request-list") - _create_add_forge_request(api_client, regular_user) - _create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) + create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) resp = check_api_get_responses(api_client, url, status_code=200) add_forge_request = { **ADD_FORGE_DATA, "status": "PENDING", "submission_date": resp.data[1]["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "id": 1, } other_forge_request = { **ADD_OTHER_FORGE_DATA, "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "id": 2, } assert resp.data == [other_forge_request, add_forge_request] @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_pagination( api_client, regular_user, api_request_factory ): - _create_add_forge_request(api_client, regular_user) - _create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) + create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user, data=ADD_OTHER_FORGE_DATA) url = reverse("api-1-add-forge-request-list", query_params={"per_page": 1}) resp = check_api_get_responses(api_client, url, 200) assert len(resp.data) == 1 request = api_request_factory.get(url) next_url = reverse( "api-1-add-forge-request-list", query_params={"page": 2, "per_page": 1}, request=request, ) assert resp["Link"] == f'<{next_url}>; rel="next"' resp = check_api_get_responses(api_client, next_url, 200) assert len(resp.data) == 1 prev_url = reverse( "api-1-add-forge-request-list", query_params={"page": 1, "per_page": 1}, request=request, ) assert resp["Link"] == f'<{prev_url}>; rel="previous"' @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_submitter_filtering( api_client, regular_user, regular_user2 ): - _create_add_forge_request(api_client, regular_user) - _create_add_forge_request(api_client, regular_user2, data=ADD_OTHER_FORGE_DATA) + create_add_forge_request(api_client, regular_user) + create_add_forge_request(api_client, regular_user2, data=ADD_OTHER_FORGE_DATA) api_client.force_login(regular_user) url = reverse( "api-1-add-forge-request-list", query_params={"user_requests_only": 1} ) resp = check_api_get_responses(api_client, url, status_code=200) assert len(resp.data) == 1 -NB_FORGE_TYPE = 2 -NB_FORGES_PER_TYPE = 20 - - -def _create_add_forge_requests(api_client, regular_user, regular_user2): - requests = [] - for i in range(NB_FORGES_PER_TYPE): - request = { - "forge_type": "gitlab", - "forge_url": f"https://gitlab.example{i:02d}.org", - "forge_contact_email": f"admin@gitlab.example{i:02d}.org", - "forge_contact_name": f"gitlab.example{i:02d}.org admin", - "forge_contact_comment": "user marked as owner in forge members", - } - _create_add_forge_request( - api_client, regular_user, data=request, - ) - requests.append(request) - - request = { - "forge_type": "gitea", - "forge_url": f"https://gitea.example{i:02d}.org", - "forge_contact_email": f"admin@gitea.example{i:02d}.org", - "forge_contact_name": f"gitea.example{i:02d}.org admin", - "forge_contact_comment": "user marked as owner in forge members", - } - _create_add_forge_request( - api_client, regular_user2, data=request, - ) - requests.append(request) - return requests - - -@pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_list_datatables( - api_client, regular_user, regular_user2, moderator_user -): - _create_add_forge_requests(api_client, regular_user, regular_user2) - - length = 10 - - url = reverse( - "api-1-add-forge-request-list", - query_params={"draw": 1, "length": length, "start": 0}, - ) - - api_client.force_login(regular_user) - resp = check_api_get_responses(api_client, url, status_code=200) - - assert resp.data["draw"] == 1 - assert resp.data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert resp.data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert len(resp.data["data"]) == length - # default ordering is by descending id - assert resp.data["data"][0]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert ( - resp.data["data"][-1]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - length + 1 - ) - assert "submitter_name" not in resp.data["data"][0] - - api_client.force_login(moderator_user) - resp = check_api_get_responses(api_client, url, status_code=200) - - assert resp.data["draw"] == 1 - assert resp.data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert resp.data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert len(resp.data["data"]) == length - # default ordering is by descending id - assert resp.data["data"][0]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert ( - resp.data["data"][-1]["id"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - length + 1 - ) - assert "submitter_name" in resp.data["data"][0] - - -@pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_list_datatables_ordering( - api_client, regular_user, regular_user2, moderator_user -): - requests = _create_add_forge_requests(api_client, regular_user, regular_user2) - requests_sorted = list(sorted(requests, key=lambda d: d["forge_url"])) - forge_urls_asc = [request["forge_url"] for request in requests_sorted] - forge_urls_desc = list(reversed(forge_urls_asc)) - - length = 10 - - for direction in ("asc", "desc"): - for i in range(4): - url = reverse( - "api-1-add-forge-request-list", - query_params={ - "draw": 1, - "length": length, - "start": i * length, - "order[0][column]": 2, - "order[0][dir]": direction, - "columns[2][name]": "forge_url", - }, - ) - - api_client.force_login(regular_user) - resp = check_api_get_responses(api_client, url, status_code=200) - - assert resp.data["draw"] == 1 - assert resp.data["recordsFiltered"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert resp.data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert len(resp.data["data"]) == length - - page_forge_urls = [request["forge_url"] for request in resp.data["data"]] - if direction == "asc": - expected_forge_urls = forge_urls_asc[i * length : (i + 1) * length] - else: - expected_forge_urls = forge_urls_desc[i * length : (i + 1) * length] - assert page_forge_urls == expected_forge_urls - - -@pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_list_datatables_search( - api_client, regular_user, regular_user2, moderator_user -): - _create_add_forge_requests(api_client, regular_user, regular_user2) - - url = reverse( - "api-1-add-forge-request-list", - query_params={ - "draw": 1, - "length": NB_FORGES_PER_TYPE, - "start": 0, - "search[value]": "gitlab", - }, - ) - - api_client.force_login(regular_user) - resp = check_api_get_responses(api_client, url, status_code=200) - - assert resp.data["draw"] == 1 - assert resp.data["recordsFiltered"] == NB_FORGES_PER_TYPE - assert resp.data["recordsTotal"] == NB_FORGE_TYPE * NB_FORGES_PER_TYPE - assert len(resp.data["data"]) == NB_FORGES_PER_TYPE - - page_forge_type = [request["forge_type"] for request in resp.data["data"]] - assert page_forge_type == ["gitlab"] * NB_FORGES_PER_TYPE - - @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_get(api_client, regular_user, moderator_user): - resp = _create_add_forge_request(api_client, regular_user) +def test_add_forge_request_get(api_client, regular_user, add_forge_moderator): + resp = create_add_forge_request(api_client, regular_user) submission_date = resp.data["submission_date"] url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) api_client.logout() url = reverse("api-1-add-forge-request-get", url_args={"id": 1}) resp = check_api_get_responses(api_client, url, status_code=200) assert resp.data == { "request": { "forge_url": ADD_FORGE_DATA["forge_url"], "forge_type": ADD_FORGE_DATA["forge_type"], "id": 1, "status": "WAITING_FOR_FEEDBACK", "submission_date": submission_date, }, "history": [ { "id": 1, "actor_role": "SUBMITTER", "date": resp.data["history"][0]["date"], "new_status": "PENDING", }, { "id": 2, "actor_role": "MODERATOR", "date": resp.data["history"][1]["date"], "new_status": "WAITING_FOR_FEEDBACK", }, ], } @pytest.mark.django_db(transaction=True, reset_sequences=True) -def test_add_forge_request_get_moderator(api_client, regular_user, moderator_user): - resp = _create_add_forge_request(api_client, regular_user) +def test_add_forge_request_get_moderator(api_client, regular_user, add_forge_moderator): + resp = create_add_forge_request(api_client, regular_user) submission_date = resp.data["submission_date"] url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) - api_client.force_login(moderator_user) + api_client.force_login(add_forge_moderator) check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) url = reverse("api-1-add-forge-request-get", url_args={"id": 1}) resp = check_api_get_responses(api_client, url, status_code=200) assert resp.data == { "request": { **ADD_FORGE_DATA, "id": 1, "status": "WAITING_FOR_FEEDBACK", "submission_date": submission_date, "submitter_name": regular_user.username, "submitter_email": regular_user.email, }, "history": [ { "id": 1, "text": "", "actor": regular_user.username, "actor_role": "SUBMITTER", "date": resp.data["history"][0]["date"], "new_status": "PENDING", }, { "id": 2, "text": "waiting for message", - "actor": moderator_user.username, + "actor": add_forge_moderator.username, "actor_role": "MODERATOR", "date": resp.data["history"][1]["date"], "new_status": "WAITING_FOR_FEEDBACK", }, ], } @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_get_invalid(api_client): url = reverse("api-1-add-forge-request-get", url_args={"id": 3}) check_api_get_responses(api_client, url, status_code=400) diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py index 6247f40c..e7097126 100644 --- a/swh/web/tests/conftest.py +++ b/swh/web/tests/conftest.py @@ -1,1216 +1,1226 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import timedelta import functools import json import os import random import shutil from subprocess import PIPE, run import sys import time from typing import Any, Dict, List, Optional from _pytest.python import Function from hypothesis import HealthCheck, settings import pytest from django.contrib.auth.models import User from django.core.cache import cache from django.test.utils import setup_databases # type: ignore from rest_framework.test import APIClient, APIRequestFactory from swh.model.hashutil import ( ALGORITHMS, DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex, ) from swh.model.model import Content, Directory from swh.model.swhids import ObjectType from swh.scheduler.tests.common import TASK_TYPES from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.algos.revisions_walker import get_revisions_walker from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest -from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID +from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION, OIDC_SWH_WEB_CLIENT_ID from swh.web.common import converters from swh.web.common.origin_save import get_scheduler_load_task_types from swh.web.common.typing import OriginVisitInfo from swh.web.common.utils import browsers_supported_image_mimes from swh.web.config import get_config from swh.web.tests.data import ( get_tests_data, override_storages, random_content, random_sha1, random_sha256, ) +from swh.web.tests.utils import create_django_permission # Used to skip some tests ctags_json_missing = ( shutil.which("ctags") is None or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout ) fossology_missing = shutil.which("nomossa") is None # Register some hypothesis profiles settings.register_profile("default", settings()) # we use getattr here to keep mypy happy regardless hypothesis version function_scoped_fixture_check = ( [getattr(HealthCheck, "function_scoped_fixture")] if hasattr(HealthCheck, "function_scoped_fixture") else [] ) suppress_health_check = [ HealthCheck.too_slow, HealthCheck.filter_too_much, ] + function_scoped_fixture_check settings.register_profile( "swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,), ) settings.register_profile( "swh-web-fast", settings( deadline=None, max_examples=5, suppress_health_check=suppress_health_check, ), ) def pytest_addoption(parser): parser.addoption("--swh-web-random-seed", action="store", default=None) def pytest_configure(config): # Use fast hypothesis profile by default if none has been # explicitly specified in pytest option if config.getoption("--hypothesis-profile") is None: settings.load_profile("swh-web-fast") # Small hack in order to be able to run the unit tests # without static assets generated by webpack. # Those assets are not really needed for the Python tests # but the django templates will fail to load due to missing # generated file webpack-stats.json describing the js and css # files to include. # So generate a dummy webpack-stats.json file to overcome # that issue. test_dir = os.path.dirname(__file__) # location of the static folder when running tests through tox data_dir = os.path.join(sys.prefix, "share/swh/web") static_dir = os.path.join(data_dir, "static") if not os.path.exists(static_dir): # location of the static folder when running tests locally with pytest static_dir = os.path.join(test_dir, "../../../static") webpack_stats = os.path.join(static_dir, "webpack-stats.json") if os.path.exists(webpack_stats): return bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles") if not os.path.exists(bundles_dir): # location of the bundles folder when running tests with tox bundles_dir = os.path.join(data_dir, "assets/src/bundles") _, bundles, _ = next(os.walk(bundles_dir)) mock_webpack_stats = { "status": "done", "publicPath": "/static", "chunks": {}, "assets": {}, } for bundle in bundles: asset = f"js/{bundle}.js" mock_webpack_stats["chunks"][bundle] = [asset] mock_webpack_stats["assets"][asset] = { "name": asset, "publicPath": f"/static/{asset}", } with open(webpack_stats, "w") as outfile: json.dump(mock_webpack_stats, outfile) _swh_web_custom_section = "swh-web custom section" _random_seed_cache_key = "swh-web/random-seed" @pytest.fixture(scope="function", autouse=True) def random_seed(pytestconfig): state = random.getstate() seed = pytestconfig.getoption("--swh-web-random-seed") if seed is None: seed = time.time() seed = int(seed) cache.set(_random_seed_cache_key, seed) random.seed(seed) yield seed random.setstate(state) def pytest_report_teststatus(report, *args): if report.when == "call" and report.outcome == "failed": seed = cache.get(_random_seed_cache_key, None) line = ( f'FAILED {report.nodeid}: Use "pytest --swh-web-random-seed={seed} ' f'{report.nodeid}" to reproduce that test failure with same inputs' ) report.sections.append((_swh_web_custom_section, line)) def pytest_terminal_summary(terminalreporter, *args): reports = terminalreporter.getreports("failed") content = os.linesep.join( text for report in reports for secname, text in report.sections if secname == _swh_web_custom_section ) if content: terminalreporter.ensure_newline() terminalreporter.section(_swh_web_custom_section, sep="-", blue=True, bold=True) terminalreporter.line(content) # Clear Django cache before each test @pytest.fixture(autouse=True) def django_cache_cleared(): cache.clear() # Alias rf fixture from pytest-django @pytest.fixture def request_factory(rf): return rf # Fixture to get test client from Django REST Framework @pytest.fixture def api_client(): return APIClient() # Fixture to get API request factory from Django REST Framework @pytest.fixture def api_request_factory(): return APIRequestFactory() # Initialize tests data @pytest.fixture(scope="function", autouse=True) def tests_data(): data = get_tests_data(reset=True) # Update swh-web configuration to use the in-memory storages # instantiated in the tests.data module override_storages( data["storage"], data["idx_storage"], data["search"], data["counters"] ) return data @pytest.fixture(scope="function") def sha1(): """Fixture returning a valid hexadecimal sha1 value. """ return random_sha1() @pytest.fixture(scope="function") def invalid_sha1(): """Fixture returning an invalid sha1 representation. """ return hash_to_hex(bytes(random.randint(0, 255) for _ in range(50))) @pytest.fixture(scope="function") def sha256(): """Fixture returning a valid hexadecimal sha256 value. """ return random_sha256() def _known_swh_objects(tests_data, object_type): return tests_data[object_type] @pytest.fixture(scope="function") def content(tests_data): """Fixture returning a random content ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "contents")) @pytest.fixture(scope="function") def contents(tests_data): """Fixture returning random contents ingested into the test archive. """ return random.choices( _known_swh_objects(tests_data, "contents"), k=random.randint(2, 8) ) def _new_content(tests_data): while True: new_content = random_content() sha1_bytes = hash_to_bytes(new_content["sha1"]) if tests_data["storage"].content_get_data(sha1_bytes) is None: return new_content @pytest.fixture(scope="function") def unknown_content(tests_data): """Fixture returning a random content not ingested into the test archive. """ return _new_content(tests_data) @pytest.fixture(scope="function") def unknown_contents(tests_data): """Fixture returning random contents not ingested into the test archive. """ new_contents = [] new_content_ids = set() nb_contents = random.randint(2, 8) while len(new_contents) != nb_contents: new_content = _new_content(tests_data) if new_content["sha1"] not in new_content_ids: new_contents.append(new_content) new_content_ids.add(new_content["sha1"]) return list(new_contents) @pytest.fixture(scope="function") def empty_content(): """Fixture returning the empty content ingested into the test archive. """ empty_content = Content.from_data(data=b"").to_dict() for algo in DEFAULT_ALGORITHMS: empty_content[algo] = hash_to_hex(empty_content[algo]) return empty_content @functools.lru_cache(maxsize=None) def _content_text(): return list( filter( lambda c: c["mimetype"].startswith("text/"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text(): """ Fixture returning a random textual content ingested into the test archive. """ return random.choice(_content_text()) @functools.lru_cache(maxsize=None) def _content_text_non_utf8(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["encoding"] not in ("utf-8", "us-ascii"), _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_non_utf8(): """Fixture returning a random textual content not encoded to UTF-8 ingested into the test archive. """ return random.choice(_content_text_non_utf8()) @functools.lru_cache(maxsize=None) def _content_application_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("application/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_application_no_highlight(): """Fixture returning a random textual content with mimetype starting with application/ and no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_application_no_highlight()) @functools.lru_cache(maxsize=None) def _content_text_no_highlight(): return list( filter( lambda c: c["mimetype"].startswith("text/") and c["hljs_language"] == "plaintext", _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_text_no_highlight(): """Fixture returning a random textual content with no detected programming language to highlight ingested into the test archive. """ return random.choice(_content_text_no_highlight()) @functools.lru_cache(maxsize=None) def _content_image_type(): return list( filter( lambda c: c["mimetype"] in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_image_type(): """Fixture returning a random image content ingested into the test archive. """ return random.choice(_content_image_type()) @functools.lru_cache(maxsize=None) def _content_unsupported_image_type_rendering(): return list( filter( lambda c: c["mimetype"].startswith("image/") and c["mimetype"] not in browsers_supported_image_mimes, _known_swh_objects(get_tests_data(), "contents"), ) ) @pytest.fixture(scope="function") def content_unsupported_image_type_rendering(): """Fixture returning a random image content ingested into the test archive that can not be rendered by browsers. """ return random.choice(_content_unsupported_image_type_rendering()) @functools.lru_cache(maxsize=None) def _content_utf8_detected_as_binary(): def utf8_binary_detected(content): if content["encoding"] != "binary": return False try: content["raw_data"].decode("utf-8") except Exception: return False else: return True return list( filter(utf8_binary_detected, _known_swh_objects(get_tests_data(), "contents")) ) @pytest.fixture(scope="function") def content_utf8_detected_as_binary(): """Fixture returning a random textual content detected as binary by libmagic while they are valid UTF-8 encoded files. """ return random.choice(_content_utf8_detected_as_binary()) @pytest.fixture(scope="function") def contents_with_ctags(): """ Fixture returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return { "sha1s": [ "0ab37c02043ebff946c1937523f60aadd0844351", "15554cf7608dde6bfefac7e3d525596343a85b6f", "2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd", "30acd0b47fc25e159e27a980102ddb1c4bea0b95", "4f81f05aaea3efb981f9d90144f746d6b682285b", "5153aa4b6e4455a62525bc4de38ed0ff6e7dd682", "59d08bafa6a749110dfb65ba43a61963d5a5bf9f", "7568285b2d7f31ae483ae71617bd3db873deaa2c", "7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4", "8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03", "9b3557f1ab4111c8607a4f2ea3c1e53c6992916c", "9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd", "c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b", "e89e55a12def4cd54d5bff58378a3b5119878eb7", "e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e", "eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5", ], "symbol_name": "ABS", } @pytest.fixture(scope="function") def directory(tests_data): """Fixture returning a random directory ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "directories")) @functools.lru_cache(maxsize=None) def _directory_with_entry_type(type_): tests_data = get_tests_data() return list( filter( lambda d: any( [ e["type"] == type_ for e in list(tests_data["storage"].directory_ls(hash_to_bytes(d))) ] ), _known_swh_objects(tests_data, "directories"), ) ) @pytest.fixture(scope="function") def directory_with_subdirs(): """Fixture returning a random directory containing sub directories ingested into the test archive. """ return random.choice(_directory_with_entry_type("dir")) @pytest.fixture(scope="function") def directory_with_files(): """Fixture returning a random directory containing at least one regular file. """ return random.choice(_directory_with_entry_type("file")) @pytest.fixture(scope="function") def unknown_directory(tests_data): """Fixture returning a random directory not ingested into the test archive. """ while True: new_directory = random_sha1() sha1_bytes = hash_to_bytes(new_directory) if list(tests_data["storage"].directory_missing([sha1_bytes])): return new_directory @pytest.fixture(scope="function") def empty_directory(): """Fixture returning the empty directory ingested into the test archive. """ return Directory(entries=()).id.hex() @pytest.fixture(scope="function") def revision(tests_data): """Fixturereturning a random revision ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "revisions")) @pytest.fixture(scope="function") def revisions(tests_data): """Fixture returning random revisions ingested into the test archive. """ return random.choices( _known_swh_objects(tests_data, "revisions"), k=random.randint(2, 8), ) @pytest.fixture(scope="function") def revisions_list(tests_data): """Fixture returning random revisions ingested into the test archive. """ def gen_revisions_list(size): return random.choices(_known_swh_objects(tests_data, "revisions"), k=size,) return gen_revisions_list @pytest.fixture(scope="function") def unknown_revision(tests_data): """Fixture returning a random revision not ingested into the test archive. """ while True: new_revision = random_sha1() sha1_bytes = hash_to_bytes(new_revision) if tests_data["storage"].revision_get([sha1_bytes])[0] is None: return new_revision def _get_origin_dfs_revisions_walker(tests_data): storage = tests_data["storage"] origin = random.choice(tests_data["origins"][:-1]) snapshot = snapshot_get_latest(storage, origin["url"]) if snapshot.branches[b"HEAD"].target_type.value == "alias": target = snapshot.branches[b"HEAD"].target head = snapshot.branches[target].target else: head = snapshot.branches[b"HEAD"].target return get_revisions_walker("dfs", storage, head) @functools.lru_cache(maxsize=None) def _ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) if not init_rev_found: master_revisions.append(rev) if not rev["parents"]: init_rev_found = True return master_revisions, children @pytest.fixture(scope="function") def ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with an ancestor relation. """ master_revisions, children = _ancestor_revisions_data() # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev["id"]] return { "sha1_git_root": hash_to_hex(root_rev["id"]), "sha1_git": hash_to_hex(ancestor_rev["id"]), "children": [hash_to_hex(r) for r in ancestor_child_revs], } @functools.lru_cache(maxsize=None) def _non_ancestor_revisions_data(): # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker(get_tests_data()) merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev["parents"]) > 1: merge_revs.append(rev) for rev_p in rev["parents"]: children[rev_p].append(rev["id"]) return merge_revs, children @pytest.fixture(scope="function") def non_ancestor_revisions(): """Fixture returning a pair of revisions ingested into the test archive with no ancestor relation. """ merge_revs, children = _non_ancestor_revisions_data() # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): selected_revs = merge_rev["parents"] return { "sha1_git_root": hash_to_hex(selected_revs[0]), "sha1_git": hash_to_hex(selected_revs[1]), } @pytest.fixture(scope="function") def revision_with_submodules(): """Fixture returning a revision that is known to point to a directory with revision entries (aka git submodules) """ return { "rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", "rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", "rev_dir_rev_path": "libtess2", } @pytest.fixture(scope="function") def release(tests_data): """Fixture returning a random release ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "releases")) @pytest.fixture(scope="function") def releases(tests_data): """Fixture returning random releases ingested into the test archive. """ return random.choices( _known_swh_objects(tests_data, "releases"), k=random.randint(2, 8) ) @pytest.fixture(scope="function") def unknown_release(tests_data): """Fixture returning a random release not ingested into the test archive. """ while True: new_release = random_sha1() sha1_bytes = hash_to_bytes(new_release) if tests_data["storage"].release_get([sha1_bytes])[0] is None: return new_release @pytest.fixture(scope="function") def snapshot(tests_data): """Fixture returning a random snapshot ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "snapshots")) @pytest.fixture(scope="function") def unknown_snapshot(tests_data): """Fixture returning a random snapshot not ingested into the test archive. """ while True: new_snapshot = random_sha1() sha1_bytes = hash_to_bytes(new_snapshot) if tests_data["storage"].snapshot_get_branches(sha1_bytes) is None: return new_snapshot @pytest.fixture(scope="function") def origin(tests_data): """Fixture returning a random origin ingested into the test archive. """ return random.choice(_known_swh_objects(tests_data, "origins")) @functools.lru_cache(maxsize=None) def _origin_with_multiple_visits(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in tests_data["origins"]: visit_page = storage.origin_visit_get(origin["url"]) if len(visit_page.results) > 1: origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_multiple_visits(): """Fixture returning a random origin with multiple visits ingested into the test archive. """ return random.choice(_origin_with_multiple_visits()) @functools.lru_cache(maxsize=None) def _origin_with_releases(): tests_data = get_tests_data() origins = [] for origin in tests_data["origins"]: snapshot = snapshot_get_latest(tests_data["storage"], origin["url"]) if any([b.target_type.value == "release" for b in snapshot.branches.values()]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_releases(): """Fixture returning a random origin with releases ingested into the test archive. """ return random.choice(_origin_with_releases()) @functools.lru_cache(maxsize=None) def _origin_with_pull_request_branches(): tests_data = get_tests_data() origins = [] storage = tests_data["storage"] for origin in storage.origin_list(limit=1000).results: snapshot = snapshot_get_latest(storage, origin.url) if any([b"refs/pull/" in b for b in snapshot.branches]): origins.append(origin) return origins @pytest.fixture(scope="function") def origin_with_pull_request_branches(): """Fixture returning a random origin with pull request branches ingested into the test archive. """ return random.choice(_origin_with_pull_request_branches()) @functools.lru_cache(maxsize=None) def _object_type_swhid(object_type): return list( filter( lambda swhid: swhid.object_type == object_type, _known_swh_objects(get_tests_data(), "swhids"), ) ) @pytest.fixture(scope="function") def content_swhid(): """Fixture returning a qualified SWHID for a random content object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.CONTENT)) @pytest.fixture(scope="function") def directory_swhid(): """Fixture returning a qualified SWHID for a random directory object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.DIRECTORY)) @pytest.fixture(scope="function") def release_swhid(): """Fixture returning a qualified SWHID for a random release object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.RELEASE)) @pytest.fixture(scope="function") def revision_swhid(): """Fixture returning a qualified SWHID for a random revision object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.REVISION)) @pytest.fixture(scope="function") def snapshot_swhid(): """Fixture returning a qualified SWHID for a snapshot object ingested into the test archive. """ return random.choice(_object_type_swhid(ObjectType.SNAPSHOT)) # Fixture to manipulate data from a sample archive used in the tests @pytest.fixture(scope="function") def archive_data(tests_data): return _ArchiveData(tests_data) # Fixture to manipulate indexer data from a sample archive used in the tests @pytest.fixture(scope="function") def indexer_data(tests_data): return _IndexerData(tests_data) # Custom data directory for requests_mock @pytest.fixture def datadir(): return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources") class _ArchiveData: """ Helper class to manage data from a sample test archive. It is initialized with a reference to an in-memory storage containing raw tests data. It is basically a proxy to Storage interface but it overrides some methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.storage = tests_data["storage"] def __getattr__(self, key): if key == "storage": raise AttributeError(key) # Forward calls to non overridden Storage methods to wrapped # storage instance return getattr(self.storage, key) def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]: cnt_ids_bytes = { algo_hash: hash_to_bytes(content[algo_hash]) for algo_hash in ALGORITHMS if content.get(algo_hash) } cnt = self.storage.content_find(cnt_ids_bytes) return converters.from_content(cnt[0].to_dict()) if cnt else cnt def content_get(self, cnt_id: str) -> Dict[str, Any]: cnt_id_bytes = hash_to_bytes(cnt_id) content = self.storage.content_get([cnt_id_bytes])[0] if content: content_d = content.to_dict() content_d.pop("ctime", None) else: content_d = None return converters.from_swh( content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"} ) def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]: cnt_id_bytes = hash_to_bytes(cnt_id) cnt_data = self.storage.content_get_data(cnt_id_bytes) if cnt_data is None: return None return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes}) def directory_get(self, dir_id): return {"id": dir_id, "content": self.directory_ls(dir_id)} def directory_ls(self, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map( converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes) ) return list(dir_content) def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]: rel_id_bytes = hash_to_bytes(rel_id) rel_data = self.storage.release_get([rel_id_bytes])[0] return converters.from_release(rel_data) if rel_data else None def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]: rev_id_bytes = hash_to_bytes(rev_id) rev_data = self.storage.revision_get([rev_id_bytes])[0] return converters.from_revision(rev_data) if rev_data else None def revision_log(self, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list( map( converters.from_revision, self.storage.revision_log([rev_id_bytes], limit=limit), ) ) def snapshot_get_latest(self, origin_url): snp = snapshot_get_latest(self.storage, origin_url) return converters.from_snapshot(snp.to_dict()) def origin_get(self, origin_urls): origins = self.storage.origin_get(origin_urls) return [converters.from_origin(o.to_dict()) for o in origins] def origin_visit_get(self, origin_url): next_page_token = None visits = [] while True: visit_page = self.storage.origin_visit_get( origin_url, page_token=next_page_token ) next_page_token = visit_page.next_page_token for visit in visit_page.results: visit_status = self.storage.origin_visit_status_get_latest( origin_url, visit.visit ) visits.append( converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) ) if not next_page_token: break return visits def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo: visit = self.storage.origin_visit_get_by(origin_url, visit_id) assert visit is not None visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id) assert visit_status is not None return converters.from_origin_visit( {**visit_status.to_dict(), "type": visit.type} ) def origin_visit_status_get_latest( self, origin_url, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ): visit_status = origin_get_latest_visit_status( self.storage, origin_url, type=type, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot, ) return ( converters.from_origin_visit(visit_status.to_dict()) if visit_status else None ) def snapshot_get(self, snapshot_id): snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp.to_dict()) def snapshot_get_branches( self, snapshot_id, branches_from="", branches_count=1000, target_types=None ): partial_branches = self.storage.snapshot_get_branches( hash_to_bytes(snapshot_id), branches_from.encode(), branches_count, target_types, ) return converters.from_partial_branches(partial_branches) def snapshot_get_head(self, snapshot): if snapshot["branches"]["HEAD"]["target_type"] == "alias": target = snapshot["branches"]["HEAD"]["target"] head = snapshot["branches"][target]["target"] else: head = snapshot["branches"]["HEAD"]["target"] return head def snapshot_count_branches(self, snapshot_id): counts = dict.fromkeys(("alias", "release", "revision"), 0) counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id))) counts.pop(None, None) return counts class _IndexerData: """ Helper class to manage indexer tests data It is initialized with a reference to an in-memory indexer storage containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ def __init__(self, tests_data): self.idx_storage = tests_data["idx_storage"] self.mimetype_indexer = tests_data["mimetype_indexer"] self.license_indexer = tests_data["license_indexer"] self.ctags_indexer = tests_data["ctags_indexer"] def content_add_mimetype(self, cnt_id): self.mimetype_indexer.run([hash_to_bytes(cnt_id)]) def content_get_mimetype(self, cnt_id): mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[ 0 ].to_dict() return converters.from_filetype(mimetype) def content_add_license(self, cnt_id): self.license_indexer.run([hash_to_bytes(cnt_id)]) def content_get_license(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes]) for license in licenses: yield converters.from_swh(license.to_dict(), hashess={"id"}) def content_add_ctags(self, cnt_id): self.ctags_indexer.run([hash_to_bytes(cnt_id)]) def content_get_ctags(self, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = self.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={"id"}) @pytest.fixture def keycloak_oidc(keycloak_oidc, mocker): keycloak_config = get_config()["keycloak"] keycloak_oidc.server_url = keycloak_config["server_url"] keycloak_oidc.realm_name = keycloak_config["realm_name"] keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client") keycloak_oidc_client.return_value = keycloak_oidc return keycloak_oidc @pytest.fixture def subtest(request): """A hack to explicitly set up and tear down fixtures. This fixture allows you to set up and tear down fixtures within the test function itself. This is useful (necessary!) for using Hypothesis inside pytest, as hypothesis will call the test function multiple times, without setting up or tearing down fixture state as it is normally the case. Copied from the pytest-subtesthack project, public domain license (https://github.com/untitaker/pytest-subtesthack). """ parent_test = request.node def inner(func): if hasattr(Function, "from_parent"): item = Function.from_parent( parent_test, name=request.function.__name__ + "[]", originalname=request.function.__name__, callobj=func, ) else: item = Function( name=request.function.__name__ + "[]", parent=parent_test, callobj=func ) nextitem = parent_test # prevents pytest from tearing down module fixtures item.ihook.pytest_runtest_setup(item=item) item.ihook.pytest_runtest_call(item=item) item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem) return inner @pytest.fixture def swh_scheduler(swh_scheduler): config = get_config() scheduler = config["scheduler"] config["scheduler"] = swh_scheduler # create load-git and load-hg task types for task_type in TASK_TYPES.values(): # see https://forge.softwareheritage.org/rDSCHc46ffadf7adf24c7eb3ffce062e8ade3818c79cc # noqa task_type["type"] = task_type["type"].replace("load-test-", "load-", 1) swh_scheduler.create_task_type(task_type) # create load-svn task type swh_scheduler.create_task_type( { "type": "load-svn", "description": "Update a Subversion repository", "backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-cvs task type swh_scheduler.create_task_type( { "type": "load-cvs", "description": "Update a CVS repository", "backend_name": "swh.loader.cvs.tasks.DumpMountAndLoadSvnRepository", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # create load-bzr task type swh_scheduler.create_task_type( { "type": "load-bzr", "description": "Update a Bazaar repository", "backend_name": "swh.loader.bzr.tasks.LoadBazaar", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) # add method to add load-archive-files task type during tests def add_load_archive_task_type(): swh_scheduler.create_task_type( { "type": "load-archive-files", "description": "Load tarballs", "backend_name": "swh.loader.package.archive.tasks.LoadArchive", "default_interval": timedelta(days=64), "min_interval": timedelta(hours=12), "max_interval": timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": timedelta(hours=2), } ) swh_scheduler.add_load_archive_task_type = add_load_archive_task_type yield swh_scheduler config["scheduler"] = scheduler get_scheduler_load_task_types.cache_clear() @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", get_config()["test_db"]["name"]), ("USER", postgresql_proc.user), ("HOST", postgresql_proc.host), ("PORT", postgresql_proc.port), } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) @pytest.fixture def staff_user(): return User.objects.create_user(username="admin", password="", is_staff=True) @pytest.fixture def regular_user(): return User.objects.create_user(username="johndoe", password="") @pytest.fixture def regular_user2(): return User.objects.create_user(username="janedoe", password="") + + +@pytest.fixture +def add_forge_moderator(): + moderator = User.objects.create_user(username="add-forge moderator", password="") + moderator.user_permissions.add( + create_django_permission(ADD_FORGE_MODERATOR_PERMISSION) + ) + return moderator diff --git a/swh/web/urls.py b/swh/web/urls.py index b840efa6..a4bf47ee 100644 --- a/swh/web/urls.py +++ b/swh/web/urls.py @@ -1,83 +1,84 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django_js_reverse.views import urls_js from django.conf import settings from django.conf.urls import ( handler400, handler403, handler404, handler500, include, url, ) from django.contrib.auth.views import LogoutView from django.contrib.staticfiles.views import serve from django.shortcuts import render from django.views.generic.base import RedirectView from swh.web.browse.identifiers import swhid_browse from swh.web.common.exc import ( swh_handle400, swh_handle403, swh_handle404, swh_handle500, ) from swh.web.common.utils import origin_visit_types from swh.web.config import get_config swh_web_config = get_config() favicon_view = RedirectView.as_view( url="/static/img/icons/swh-logo-32x32.png", permanent=True ) def _default_view(request): return render(request, "homepage.html", {"visit_types": origin_visit_types()}) urlpatterns = [ url(r"^admin/", include("swh.web.admin.urls")), url(r"^favicon\.ico$", favicon_view), url(r"^api/", include("swh.web.api.urls")), url(r"^browse/", include("swh.web.browse.urls")), url(r"^$", _default_view, name="swh-web-homepage"), url(r"^jsreverse/$", urls_js, name="js_reverse"), # keep legacy SWHID resolving URL with trailing slash for backward compatibility url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)/$", swhid_browse, name="browse-swhid-legacy", ), url( r"^(?P(swh|SWH):[0-9]+:[A-Za-z]+:[0-9A-Fa-f]+.*)$", swhid_browse, name="browse-swhid", ), url(r"^", include("swh.web.misc.urls")), + url(r"^", include("swh.web.add_forge_now.views")), url(r"^", include("swh.web.auth.views")), url(r"^logout/$", LogoutView.as_view(template_name="logout.html"), name="logout"), ] # allow to serve assets through django staticfiles # even if settings.DEBUG is False def insecure_serve(request, path, **kwargs): return serve(request, path, insecure=True, **kwargs) # enable to serve compressed assets through django development server if swh_web_config["serve_assets"]: static_pattern = r"^%s(?P.*)$" % settings.STATIC_URL[1:] urlpatterns.append(url(static_pattern, insecure_serve)) handler400 = swh_handle400 # noqa handler403 = swh_handle403 # noqa handler404 = swh_handle404 # noqa handler500 = swh_handle500 # noqa