diff --git a/assets/src/bundles/add_forge/request-dashboard.js b/assets/src/bundles/add_forge/request-dashboard.js index 7924c3e6..575b53a1 100644 --- a/assets/src/bundles/add_forge/request-dashboard.js +++ b/assets/src/bundles/add_forge/request-dashboard.js @@ -1,131 +1,131 @@ /** * Copyright (C) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ import {handleFetchError, csrfPost, getHumanReadableDate} from 'utils/functions'; import emailTempate from './forge-admin-email.ejs'; import requestHistoryItem from './add-request-history-item.ejs'; let forgeRequest; export function onRequestDashboardLoad(requestId) { $(document).ready(() => { populateRequestDetails(requestId); $('#contactForgeAdmin').click((event) => { contactForgeAdmin(event); }); $('#updateRequestForm').submit(async function(event) { event.preventDefault(); try { const response = await csrfPost($(this).attr('action'), {'Content-Type': 'application/x-www-form-urlencoded'}, $(this).serialize()); handleFetchError(response); $('#userMessage').text('The request status has been updated '); $('#userMessage').removeClass('badge-danger'); $('#userMessage').addClass('badge-success'); populateRequestDetails(requestId); } catch (response) { $('#userMessage').text('Sorry; Updating the request failed'); $('#userMessage').removeClass('badge-success'); $('#userMessage').addClass('badge-danger'); } }); }); } async function populateRequestDetails(requestId) { try { const response = await fetch(Urls.api_1_add_forge_request_get(requestId)); handleFetchError(response); const data = await response.json(); forgeRequest = data.request; $('#requestStatus').text(swh.add_forge.formatRequestStatusName(forgeRequest.status)); $('#requestType').text(forgeRequest.forge_type); $('#requestURL').text(forgeRequest.forge_url); $('#requestContactName').text(forgeRequest.forge_contact_name); $('#requestContactConsent').text(forgeRequest.submitter_forward_username); $('#requestContactEmail').text(forgeRequest.forge_contact_email); $('#submitterMessage').text(forgeRequest.forge_contact_comment); $('#updateComment').val(''); // Setting data for the email, now adding static data $('#contactForgeAdmin').attr('emailTo', forgeRequest.forge_contact_email); - $('#contactForgeAdmin').attr('emailSubject', `[swh-add_forge_now] Request ${forgeRequest.id}`); + $('#contactForgeAdmin').attr('emailSubject', `Software Heritage archival request for ${forgeRequest.forge_domain}`); populateRequestHistory(data.history); populateDecisionSelectOption(forgeRequest.status); } catch (response) { // The error message $('#fetchError').removeClass('d-none'); $('#requestDetails').addClass('d-none'); } } function populateRequestHistory(history) { $('#requestHistory').children().remove(); history.forEach((event, index) => { const historyEvent = requestHistoryItem({ 'event': event, 'index': index, 'getHumanReadableDate': getHumanReadableDate }); $('#requestHistory').append(historyEvent); }); } export function populateDecisionSelectOption(currentStatus) { const nextStatusesFor = { 'PENDING': ['WAITING_FOR_FEEDBACK', 'REJECTED', 'SUSPENDED'], 'WAITING_FOR_FEEDBACK': ['FEEDBACK_TO_HANDLE'], 'FEEDBACK_TO_HANDLE': [ 'WAITING_FOR_FEEDBACK', 'ACCEPTED', 'REJECTED', 'SUSPENDED' ], 'ACCEPTED': ['SCHEDULED'], 'SCHEDULED': [ 'FIRST_LISTING_DONE', 'FIRST_ORIGIN_LOADED' ], 'FIRST_LISTING_DONE': ['FIRST_ORIGIN_LOADED'], 'FIRST_ORIGIN_LOADED': [], 'REJECTED': [], 'SUSPENDED': ['PENDING'], 'DENIED': [] }; // Determine the possible next status out of the current one const nextStatuses = nextStatusesFor[currentStatus]; function addStatusOption(status, index) { // Push the next possible status options const label = swh.add_forge.formatRequestStatusName(status); $('#decisionOptions').append( `` ); } // Remove all the options and add new ones $('#decisionOptions').children().remove(); nextStatuses.forEach(addStatusOption); $('#decisionOptions').append( '' ); } function contactForgeAdmin(event) { // Open the mailclient with pre-filled text const mailTo = $('#contactForgeAdmin').attr('emailTo'); const subject = $('#contactForgeAdmin').attr('emailSubject'); const emailText = emailTempate({'forgeUrl': forgeRequest.forge_url}).trim().replace(/\n/g, '%0D%0A'); const w = window.open('', '_blank', '', true); w.location.href = `mailto: ${mailTo}?subject=${subject}&body=${emailText}`; w.focus(); } diff --git a/cypress/integration/add-forge-now-request-dashboard.spec.js b/cypress/integration/add-forge-now-request-dashboard.spec.js index 70bdaeca..2acd0772 100644 --- a/cypress/integration/add-forge-now-request-dashboard.spec.js +++ b/cypress/integration/add-forge-now-request-dashboard.spec.js @@ -1,229 +1,230 @@ /** * Copyright (C) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ -let requestId; +let requestId, forgeDomain; function createDummyRequest(urls) { cy.task('db:add_forge_now:delete'); cy.userLogin(); cy.getCookie('csrftoken').its('value').then((token) => { cy.request({ method: 'POST', url: urls.api_1_add_forge_request_create(), body: { forge_type: 'bitbucket', forge_url: 'test.example.com', forge_contact_email: 'test@example.com', forge_contact_name: 'test user', submitter_forward_username: true, forge_contact_comment: 'test comment' }, headers: { 'X-CSRFToken': token } }).then((response) => { - // setting requestId from response + // setting requestId and forgeDomain from response requestId = response.body.id; + forgeDomain = response.body.forge_domain; // logout the user cy.visit(urls.swh_web_homepage()); cy.contains('a', 'logout').click(); }); }); } describe('Test add forge now request dashboard load', function() { before(function() { // Create an add-forge-request object in the DB createDummyRequest(this.Urls); }); beforeEach(function() { const url = this.Urls.add_forge_now_request_dashboard(requestId); // request dashboard require admin permissions to view cy.adminLogin(); cy.intercept(`${this.Urls.api_1_add_forge_request_get(requestId)}**`).as('forgeRequestGet'); cy.visit(url); }); it('should load add forge request details', function() { cy.wait('@forgeRequestGet'); cy.get('#requestStatus') .should('contain', 'Pending'); cy.get('#requestType') .should('contain', 'bitbucket'); cy.get('#requestURL') .should('contain', 'test.example.com'); cy.get('#requestContactEmail') .should('contain', 'test@example.com'); cy.get('#requestContactName') .should('contain', 'test user'); cy.get('#requestContactEmail') .should('contain', 'test@example.com'); cy.get('#requestContactConsent') .should('contain', 'true'); cy.get('#submitterMessage') .should('contain', 'test comment'); }); it('should show send message link', function() { cy.wait('@forgeRequestGet'); cy.get('#contactForgeAdmin') .should('have.attr', 'emailto') .and('include', 'test@example.com'); cy.get('#contactForgeAdmin') .should('have.attr', 'emailsubject') - .and('include', `[swh-add_forge_now] Request ${requestId}`); + .and('include', `Software Heritage archival request for ${forgeDomain}`); }); it('should not show any error message', function() { cy.wait('@forgeRequestGet'); cy.get('#fetchError') .should('have.class', 'd-none'); cy.get('#requestDetails') .should('not.have.class', 'd-none'); }); it('should show error message for an api error', function() { // requesting with a non existing request ID const invalidRequestId = requestId + 10; const url = this.Urls.add_forge_now_request_dashboard(invalidRequestId); cy.intercept(`${this.Urls.api_1_add_forge_request_get(invalidRequestId)}**`, {statusCode: 400}).as('forgeAddInvalidRequest'); cy.visit(url); cy.wait('@forgeAddInvalidRequest'); cy.get('#fetchError') .should('not.have.class', 'd-none'); cy.get('#requestDetails') .should('have.class', 'd-none'); }); it('should load add forge request history', function() { cy.wait('@forgeRequestGet'); cy.get('#requestHistory') .children() .should('have.length', 1); cy.get('#requestHistory') .children() .should('contain', 'New status: Pending'); cy.get('#requestHistory') .should('contain', 'From user (SUBMITTER)'); }); it('should load possible next status', function() { cy.wait('@forgeRequestGet'); // 3 possible next status and the comment option cy.get('#decisionOptions') .children() .should('have.length', 4); }); }); function populateAndSubmitForm() { cy.get('#decisionOptions').select('WAITING_FOR_FEEDBACK'); cy.get('#updateComment').type('This is an update comment'); cy.get('#updateRequestForm').submit(); } describe('Test forge now request update', function() { beforeEach(function() { createDummyRequest(this.Urls); const url = this.Urls.add_forge_now_request_dashboard(requestId); cy.adminLogin(); // intercept GET API on page load cy.intercept(`${this.Urls.api_1_add_forge_request_get(requestId)}**`).as('forgeRequestGet'); // intercept update POST API cy.intercept('POST', `${this.Urls.api_1_add_forge_request_update(requestId)}**`).as('forgeRequestUpdate'); cy.visit(url); }); it('should submit correct details', function() { cy.wait('@forgeRequestGet'); populateAndSubmitForm(); // Making sure posting the right data cy.wait('@forgeRequestUpdate').its('request.body') .should('include', 'new_status') .should('include', 'text') .should('include', 'WAITING_FOR_FEEDBACK'); }); it('should show success message', function() { cy.wait('@forgeRequestGet'); populateAndSubmitForm(); // Making sure showing the success message cy.wait('@forgeRequestUpdate'); cy.get('#userMessage') .should('contain', 'The request status has been updated') .should('not.have.class', 'badge-danger') .should('have.class', 'badge-success'); }); it('should update the dashboard after submit', function() { cy.wait('@forgeRequestGet'); populateAndSubmitForm(); // Making sure the UI is updated after the submit cy.wait('@forgeRequestGet'); cy.get('#requestStatus') .should('contain', 'Waiting for feedback'); cy.get('#requestHistory') .children() .should('have.length', 2); cy.get('#requestHistory') .children() .should('contain', 'New status: Waiting for feedback'); cy.get('#requestHistory') .children() .should('contain', 'This is an update comment'); cy.get('#requestHistory') .children() .should('contain', 'Status changed to: Waiting for feedback'); cy.get('#decisionOptions') .children() .should('have.length', 2); }); it('should show an error on API failure', function() { cy.intercept('POST', `${this.Urls.api_1_add_forge_request_update(requestId)}**`, {forceNetworkError: true}) .as('updateFailedRequest'); cy.get('#updateComment').type('This is an update comment'); cy.get('#updateRequestForm').submit(); cy.wait('@updateFailedRequest'); cy.get('#userMessage') .should('contain', 'Sorry; Updating the request failed') .should('have.class', 'badge-danger') .should('not.have.class', 'badge-success'); }); }); diff --git a/swh/web/add_forge_now/models.py b/swh/web/add_forge_now/models.py index 61a77796..0a4b8aa9 100644 --- a/swh/web/add_forge_now/models.py +++ b/swh/web/add_forge_now/models.py @@ -1,120 +1,136 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import enum from typing import List +from urllib.parse import urlparse from django.db import models from ..config import get_config from ..inbound_email.utils import get_address_for_pk from .apps import APP_LABEL class RequestStatus(enum.Enum): """Request statuses. Values are used in the ui. """ PENDING = "Pending" WAITING_FOR_FEEDBACK = "Waiting for feedback" FEEDBACK_TO_HANDLE = "Feedback to handle" ACCEPTED = "Accepted" SCHEDULED = "Scheduled" FIRST_LISTING_DONE = "First listing done" FIRST_ORIGIN_LOADED = "First origin loaded" REJECTED = "Rejected" SUSPENDED = "Suspended" DENIED = "Denied" @classmethod def choices(cls): return tuple((variant.name, variant.value) for variant in cls) def allowed_next_statuses(self) -> List[RequestStatus]: next_statuses = { self.PENDING: [self.WAITING_FOR_FEEDBACK, self.REJECTED, self.SUSPENDED], self.WAITING_FOR_FEEDBACK: [self.FEEDBACK_TO_HANDLE], self.FEEDBACK_TO_HANDLE: [ self.WAITING_FOR_FEEDBACK, self.ACCEPTED, self.REJECTED, self.SUSPENDED, ], self.ACCEPTED: [self.SCHEDULED], self.SCHEDULED: [ self.FIRST_LISTING_DONE, # in case of race condition between lister and loader: self.FIRST_ORIGIN_LOADED, ], self.FIRST_LISTING_DONE: [self.FIRST_ORIGIN_LOADED], self.FIRST_ORIGIN_LOADED: [], self.REJECTED: [], self.SUSPENDED: [self.PENDING], self.DENIED: [], } return next_statuses[self] # type: ignore class RequestActorRole(enum.Enum): MODERATOR = "moderator" SUBMITTER = "submitter" FORGE_ADMIN = "forge admin" @classmethod def choices(cls): return tuple((variant.name, variant.value) for variant in cls) class RequestHistory(models.Model): """Comment or status change. This is commented or changed by either submitter or moderator. """ request = models.ForeignKey("Request", models.DO_NOTHING) text = models.TextField() actor = models.TextField() actor_role = models.TextField(choices=RequestActorRole.choices()) date = models.DateTimeField(auto_now_add=True) new_status = models.TextField(choices=RequestStatus.choices(), null=True) class Meta: app_label = APP_LABEL db_table = "add_forge_request_history" class Request(models.Model): status = models.TextField( choices=RequestStatus.choices(), default=RequestStatus.PENDING.name, ) submission_date = models.DateTimeField(auto_now_add=True) submitter_name = models.TextField() submitter_email = models.TextField() submitter_forward_username = models.BooleanField(default=False) # FIXME: shall we do create a user model inside the webapp instead? forge_type = models.TextField() forge_url = models.TextField() forge_contact_email = models.EmailField() forge_contact_name = models.TextField() forge_contact_comment = models.TextField( null=True, help_text="Where did you find this contact information (url, ...)", ) class Meta: app_label = APP_LABEL db_table = "add_forge_request" @property def inbound_email_address(self) -> str: """Generate an email address for correspondence related to this request.""" base_address = get_config()["add_forge_now"]["email_address"] return get_address_for_pk(salt=APP_LABEL, base_address=base_address, pk=self.pk) + + @property + def forge_domain(self) -> str: + """Get the domain/netloc out of the forge_url. + + Fallback to using the first part of the url path, if the netloc can't be found + (for instance, if the url scheme hasn't been set). + """ + + parsed_url = urlparse(self.forge_url) + domain = parsed_url.netloc + if not domain: + domain = parsed_url.path.split("/", 1)[0] + + return domain diff --git a/swh/web/api/views/add_forge_now.py b/swh/web/api/views/add_forge_now.py index 3d2c95ed..ce30e026 100644 --- a/swh/web/api/views/add_forge_now.py +++ b/swh/web/api/views/add_forge_now.py @@ -1,397 +1,398 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Union from django.core.exceptions import ObjectDoesNotExist from django.core.paginator import Paginator from django.db import transaction from django.db.models.query import QuerySet from django.forms import CharField, ModelForm from django.http import HttpResponseBadRequest from django.http.request import HttpRequest from django.http.response import HttpResponse, HttpResponseForbidden from rest_framework import serializers from rest_framework.request import Request from rest_framework.response import Response from swh.web.add_forge_now.models import Request as AddForgeRequest from swh.web.add_forge_now.models import RequestActorRole as AddForgeNowRequestActorRole from swh.web.add_forge_now.models import RequestHistory as AddForgeNowRequestHistory from swh.web.add_forge_now.models import RequestStatus as AddForgeNowRequestStatus from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION from swh.web.common.exc import BadInputExc from swh.web.common.utils import has_add_forge_now_permission, reverse def _block_while_testing(): """Replaced by tests to check concurrency behavior""" pass class AddForgeNowRequestForm(ModelForm): forge_contact_comment = CharField( required=False, ) class Meta: model = AddForgeRequest fields = ( "forge_type", "forge_url", "forge_contact_email", "forge_contact_name", "forge_contact_comment", "submitter_forward_username", ) class AddForgeNowRequestHistoryForm(ModelForm): new_status = CharField( max_length=200, required=False, ) class Meta: model = AddForgeNowRequestHistory fields = ("text", "new_status") class AddForgeNowRequestSerializer(serializers.ModelSerializer): inbound_email_address = serializers.CharField() + forge_domain = serializers.CharField() last_moderator = serializers.SerializerMethodField() last_modified_date = serializers.SerializerMethodField() history: Dict[int, QuerySet] = {} class Meta: model = AddForgeRequest fields = "__all__" def _gethistory(self, request): if request.id not in self.history: self.history[request.id] = AddForgeNowRequestHistory.objects.filter( request=request ).order_by("id") return self.history[request.id] def get_last_moderator(self, request): last_history_with_moderator = ( self._gethistory(request).filter(actor_role="MODERATOR").last() ) return ( last_history_with_moderator.actor if last_history_with_moderator else "None" ) def get_last_modified_date(self, request): last_history = self._gethistory(request).last() return ( last_history.date.isoformat().replace("+00:00", "Z") if last_history else None ) class AddForgeNowRequestPublicSerializer(serializers.ModelSerializer): """Serializes AddForgeRequest without private fields.""" class Meta: model = AddForgeRequest fields = ("id", "forge_url", "forge_type", "status", "submission_date") class AddForgeNowRequestHistorySerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory exclude = ("request",) class AddForgeNowRequestHistoryPublicSerializer(serializers.ModelSerializer): class Meta: model = AddForgeNowRequestHistory fields = ("id", "date", "new_status", "actor_role") @api_route( r"/add-forge/request/create/", "api-1-add-forge-request-create", methods=["POST"], ) @api_doc("/add-forge/request/create") @format_docstring() @transaction.atomic def api_add_forge_request_create(request: Union[HttpRequest, Request]) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/create/ Create a new request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/update/", "api-1-add-forge-request-update", methods=["POST"], ) @api_doc("/add-forge/request/update", tags=["hidden"]) @format_docstring() @transaction.atomic def api_add_forge_request_update( request: Union[HttpRequest, Request], id: int ) -> HttpResponse: """ .. http:post:: /api/1/add-forge/request/update/ Update a request to add a forge to the list of those crawled regularly by Software Heritage. .. warning:: That endpoint is not publicly available and requires authentication in order to be able to request it. {common_headers} :[0-9]+)/get/", "api-1-add-forge-request-get", methods=["GET"], ) @api_doc("/add-forge/request/get") @format_docstring() def api_add_forge_request_get(request: Request, id: int): """ .. http:get:: /api/1/add-forge/request/get/ Return all details about an add-forge request. {common_headers} :param int id: add-forge request identifier :statuscode 200: request details successfully returned :statuscode 400: request identifier does not exist """ try: add_forge_request = AddForgeRequest.objects.get(id=id) except ObjectDoesNotExist: raise BadInputExc("Request id does not exist") request_history = AddForgeNowRequestHistory.objects.filter( request=add_forge_request ).order_by("id") if request.user.is_authenticated and request.user.has_perm( ADD_FORGE_MODERATOR_PERMISSION ): data = AddForgeNowRequestSerializer(add_forge_request).data history = AddForgeNowRequestHistorySerializer(request_history, many=True).data else: data = AddForgeNowRequestPublicSerializer(add_forge_request).data history = AddForgeNowRequestHistoryPublicSerializer( request_history, many=True ).data return {"request": data, "history": history} diff --git a/swh/web/tests/add_forge_now/test_models.py b/swh/web/tests/add_forge_now/test_models.py index 86e896f1..e9e617e2 100644 --- a/swh/web/tests/add_forge_now/test_models.py +++ b/swh/web/tests/add_forge_now/test_models.py @@ -1,26 +1,38 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.web.add_forge_now.models import RequestStatus +from swh.web.add_forge_now.models import Request, RequestStatus @pytest.mark.parametrize( "current_status, allowed_next_statuses", [ ( RequestStatus.PENDING, [ RequestStatus.WAITING_FOR_FEEDBACK, RequestStatus.REJECTED, RequestStatus.SUSPENDED, ], ), (RequestStatus.WAITING_FOR_FEEDBACK, [RequestStatus.FEEDBACK_TO_HANDLE]), ], ) def test_allowed_next_statuses(current_status, allowed_next_statuses): assert current_status.allowed_next_statuses() == allowed_next_statuses + + +@pytest.mark.parametrize( + "forge_url, expected_domain", + [ + ("https://gitlab.example.com/foo/bar", "gitlab.example.com"), + ("gitlab.example.com", "gitlab.example.com"), + ("gitlab.example.com/foo/bar", "gitlab.example.com"), + ], +) +def test_request_forge_domain(forge_url, expected_domain): + assert Request(forge_url=forge_url).forge_domain == expected_domain diff --git a/swh/web/tests/api/views/test_add_forge_now.py b/swh/web/tests/api/views/test_add_forge_now.py index 538fa50f..c1843e87 100644 --- a/swh/web/tests/api/views/test_add_forge_now.py +++ b/swh/web/tests/api/views/test_add_forge_now.py @@ -1,564 +1,569 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import threading import time from typing import Dict -from urllib.parse import urlencode +from urllib.parse import urlencode, urlparse import iso8601 import pytest from swh.web.add_forge_now.models import Request from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.inbound_email.utils import get_address_for_pk from swh.web.tests.utils import ( check_api_get_responses, check_api_post_response, check_http_post_response, ) @pytest.mark.django_db def test_add_forge_request_create_anonymous_user(api_client): url = reverse("api-1-add-forge-request-create") check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db def test_add_forge_request_create_empty(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") resp = check_api_post_response(api_client, url, status_code=400) assert '"forge_type"' in resp.data["reason"] ADD_FORGE_DATA_FORGE1: Dict = { "forge_type": "gitlab", "forge_url": "https://gitlab.example.org", "forge_contact_email": "admin@gitlab.example.org", "forge_contact_name": "gitlab.example.org admin", "forge_contact_comment": "user marked as owner in forge members", "submitter_forward_username": True, } ADD_FORGE_DATA_FORGE2: Dict = { "forge_type": "gitea", "forge_url": "https://gitea.example.org", "forge_contact_email": "admin@gitea.example.org", "forge_contact_name": "gitea.example.org admin", "forge_contact_comment": "user marked as owner in forge members", "submitter_forward_username": True, } ADD_FORGE_DATA_FORGE3: Dict = { "forge_type": "heptapod", "forge_url": "https://heptapod.host/", "forge_contact_email": "admin@example.org", "forge_contact_name": "heptapod admin", "forge_contact_comment": "", # authorized empty or null comment "submitter_forward_username": False, } ADD_FORGE_DATA_FORGE4: Dict = { **ADD_FORGE_DATA_FORGE3, "forge_url": "https://heptapod2.host/", "submitter_forward_username": "on", } ADD_FORGE_DATA_FORGE5: Dict = { **ADD_FORGE_DATA_FORGE3, "forge_url": "https://heptapod3.host/", "submitter_forward_username": "off", } def inbound_email_for_pk(pk: int) -> str: """Check that the inbound email matches the one expected for the given pk""" base_address = get_config()["add_forge_now"]["email_address"] return get_address_for_pk( salt="swh_web_add_forge_now", base_address=base_address, pk=pk ) @pytest.mark.django_db(transaction=True, reset_sequences=True) @pytest.mark.parametrize( "add_forge_data", [ ADD_FORGE_DATA_FORGE1, ADD_FORGE_DATA_FORGE2, ADD_FORGE_DATA_FORGE3, ADD_FORGE_DATA_FORGE4, ], ) def test_add_forge_request_create_success_post( api_client, regular_user, add_forge_data ): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") date_before = datetime.datetime.now(tz=datetime.timezone.utc) resp = check_api_post_response( api_client, url, data=add_forge_data, status_code=201, ) date_after = datetime.datetime.now(tz=datetime.timezone.utc) consent = add_forge_data["submitter_forward_username"] # map the expected result with what's expectedly read from the db to ease comparison expected_consent_bool = consent == "on" if isinstance(consent, str) else consent assert resp.data == { **add_forge_data, "id": resp.data["id"], "status": "PENDING", "submission_date": resp.data["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "submitter_forward_username": expected_consent_bool, "last_moderator": resp.data["last_moderator"], "last_modified_date": resp.data["last_modified_date"], "inbound_email_address": inbound_email_for_pk(resp.data["id"]), + "forge_domain": urlparse(add_forge_data["forge_url"]).netloc, } assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after request = Request.objects.all().last() assert request.forge_url == add_forge_data["forge_url"] assert request.submitter_name == regular_user.username @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_create_success_form_encoded(client, regular_user): client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") date_before = datetime.datetime.now(tz=datetime.timezone.utc) resp = check_http_post_response( client, url, request_content_type="application/x-www-form-urlencoded", data=urlencode(ADD_FORGE_DATA_FORGE1), status_code=201, ) date_after = datetime.datetime.now(tz=datetime.timezone.utc) assert resp.data == { **ADD_FORGE_DATA_FORGE1, "id": resp.data["id"], "status": "PENDING", "submission_date": resp.data["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "last_moderator": resp.data["last_moderator"], "last_modified_date": resp.data["last_modified_date"], "inbound_email_address": inbound_email_for_pk(1), + "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc, } assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after request = Request.objects.all()[0] assert request.forge_url == ADD_FORGE_DATA_FORGE1["forge_url"] assert request.submitter_name == regular_user.username @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_create_duplicate(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") check_api_post_response( api_client, url, data=ADD_FORGE_DATA_FORGE1, status_code=201, ) check_api_post_response( api_client, url, data=ADD_FORGE_DATA_FORGE1, status_code=409, ) requests = Request.objects.all() assert len(requests) == 1 @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_anonymous_user(api_client): url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_regular_user(api_client, regular_user): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=403) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_non_existent(api_client, add_forge_moderator): api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=400) def create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE1): api_client.force_login(regular_user) url = reverse("api-1-add-forge-request-create") return check_api_post_response( api_client, url, data=data, status_code=201, ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_empty(api_client, regular_user, add_forge_moderator): create_add_forge_request(api_client, regular_user) api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, status_code=400) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_missing_field( api_client, regular_user, add_forge_moderator ): create_add_forge_request(api_client, regular_user) api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response(api_client, url, data={}, status_code=400) check_api_post_response( api_client, url, data={"new_status": "REJECTED"}, status_code=400 ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update(api_client, regular_user, add_forge_moderator): create_add_forge_request(api_client, regular_user) api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response( api_client, url, data={"text": "updating request"}, status_code=200 ) check_api_post_response( api_client, url, data={"new_status": "REJECTED", "text": "request rejected"}, status_code=200, ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_invalid_new_status( api_client, regular_user, add_forge_moderator ): create_add_forge_request(api_client, regular_user) api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) check_api_post_response( api_client, url, data={"new_status": "ACCEPTED", "text": "request accepted"}, status_code=400, ) @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_update_status_concurrent( api_client, regular_user, add_forge_moderator, mocker ): _block_while_testing = mocker.patch( "swh.web.api.views.add_forge_now._block_while_testing" ) _block_while_testing.side_effect = lambda: time.sleep(1) create_add_forge_request(api_client, regular_user) api_client.force_login(add_forge_moderator) url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) worker_ended = False def worker(): nonlocal worker_ended check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) worker_ended = True # this thread will first modify the request status to WAITING_FOR_FEEDBACK thread = threading.Thread(target=worker) thread.start() # the other thread (slower) will attempt to modify the request status to REJECTED # but it will not be allowed as the first faster thread already modified it # and REJECTED state can not be reached from WAITING_FOR_FEEDBACK one time.sleep(0.5) check_api_post_response( api_client, url, data={"new_status": "REJECTED", "text": "request accepted"}, status_code=400, ) thread.join() assert worker_ended @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_anonymous(api_client, regular_user): url = reverse("api-1-add-forge-request-list") resp = check_api_get_responses(api_client, url, status_code=200) assert resp.data == [] create_add_forge_request(api_client, regular_user) resp = check_api_get_responses(api_client, url, status_code=200) add_forge_request = { "forge_url": ADD_FORGE_DATA_FORGE1["forge_url"], "forge_type": ADD_FORGE_DATA_FORGE1["forge_type"], "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "id": resp.data[0]["id"], } assert resp.data == [add_forge_request] create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2) resp = check_api_get_responses(api_client, url, status_code=200) other_forge_request = { "forge_url": ADD_FORGE_DATA_FORGE2["forge_url"], "forge_type": ADD_FORGE_DATA_FORGE2["forge_type"], "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "id": resp.data[0]["id"], } assert resp.data == [other_forge_request, add_forge_request] @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_moderator( api_client, regular_user, add_forge_moderator ): url = reverse("api-1-add-forge-request-list") create_add_forge_request(api_client, regular_user) create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2) api_client.force_login(add_forge_moderator) resp = check_api_get_responses(api_client, url, status_code=200) add_forge_request = { **ADD_FORGE_DATA_FORGE1, "status": "PENDING", "submission_date": resp.data[1]["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "last_moderator": resp.data[1]["last_moderator"], "last_modified_date": resp.data[1]["last_modified_date"], "id": resp.data[1]["id"], "inbound_email_address": inbound_email_for_pk(resp.data[1]["id"]), + "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc, } other_forge_request = { **ADD_FORGE_DATA_FORGE2, "status": "PENDING", "submission_date": resp.data[0]["submission_date"], "submitter_name": regular_user.username, "submitter_email": regular_user.email, "last_moderator": resp.data[0]["last_moderator"], "last_modified_date": resp.data[0]["last_modified_date"], "id": resp.data[0]["id"], "inbound_email_address": inbound_email_for_pk(resp.data[0]["id"]), + "forge_domain": urlparse(ADD_FORGE_DATA_FORGE2["forge_url"]).netloc, } assert resp.data == [other_forge_request, add_forge_request] @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_pagination( api_client, regular_user, api_request_factory ): create_add_forge_request(api_client, regular_user) create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2) url = reverse("api-1-add-forge-request-list", query_params={"per_page": 1}) resp = check_api_get_responses(api_client, url, 200) assert len(resp.data) == 1 request = api_request_factory.get(url) next_url = reverse( "api-1-add-forge-request-list", query_params={"page": 2, "per_page": 1}, request=request, ) assert resp["Link"] == f'<{next_url}>; rel="next"' resp = check_api_get_responses(api_client, next_url, 200) assert len(resp.data) == 1 prev_url = reverse( "api-1-add-forge-request-list", query_params={"page": 1, "per_page": 1}, request=request, ) assert resp["Link"] == f'<{prev_url}>; rel="previous"' @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_list_submitter_filtering( api_client, regular_user, regular_user2 ): create_add_forge_request(api_client, regular_user) create_add_forge_request(api_client, regular_user2, data=ADD_FORGE_DATA_FORGE2) api_client.force_login(regular_user) url = reverse( "api-1-add-forge-request-list", query_params={"user_requests_only": 1} ) resp = check_api_get_responses(api_client, url, status_code=200) assert len(resp.data) == 1 @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_get(api_client, regular_user, add_forge_moderator): resp = create_add_forge_request(api_client, regular_user) submission_date = resp.data["submission_date"] url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) api_client.force_login(add_forge_moderator) check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) api_client.logout() url = reverse("api-1-add-forge-request-get", url_args={"id": 1}) resp = check_api_get_responses(api_client, url, status_code=200) assert resp.data == { "request": { "forge_url": ADD_FORGE_DATA_FORGE1["forge_url"], "forge_type": ADD_FORGE_DATA_FORGE1["forge_type"], "id": 1, "status": "WAITING_FOR_FEEDBACK", "submission_date": submission_date, }, "history": [ { "id": 1, "actor_role": "SUBMITTER", "date": resp.data["history"][0]["date"], "new_status": "PENDING", }, { "id": 2, "actor_role": "MODERATOR", "date": resp.data["history"][1]["date"], "new_status": "WAITING_FOR_FEEDBACK", }, ], } @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_get_moderator(api_client, regular_user, add_forge_moderator): resp = create_add_forge_request(api_client, regular_user) submission_date = resp.data["submission_date"] url = reverse("api-1-add-forge-request-update", url_args={"id": 1}) api_client.force_login(add_forge_moderator) check_api_post_response( api_client, url, data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"}, status_code=200, ) url = reverse("api-1-add-forge-request-get", url_args={"id": 1}) resp = check_api_get_responses(api_client, url, status_code=200) resp.data["history"] = [dict(history_item) for history_item in resp.data["history"]] assert resp.data == { "request": { **ADD_FORGE_DATA_FORGE1, "id": 1, "status": "WAITING_FOR_FEEDBACK", "submission_date": submission_date, "submitter_name": regular_user.username, "submitter_email": regular_user.email, "last_moderator": add_forge_moderator.username, "last_modified_date": resp.data["history"][1]["date"], "inbound_email_address": inbound_email_for_pk(1), + "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc, }, "history": [ { "id": 1, "text": "", "actor": regular_user.username, "actor_role": "SUBMITTER", "date": resp.data["history"][0]["date"], "new_status": "PENDING", }, { "id": 2, "text": "waiting for message", "actor": add_forge_moderator.username, "actor_role": "MODERATOR", "date": resp.data["history"][1]["date"], "new_status": "WAITING_FOR_FEEDBACK", }, ], } @pytest.mark.django_db(transaction=True, reset_sequences=True) def test_add_forge_request_get_invalid(api_client): url = reverse("api-1-add-forge-request-get", url_args={"id": 3}) check_api_get_responses(api_client, url, status_code=400)