diff --git a/assets/src/bundles/add_forge/request-dashboard.js b/assets/src/bundles/add_forge/request-dashboard.js
index 7924c3e6..575b53a1 100644
--- a/assets/src/bundles/add_forge/request-dashboard.js
+++ b/assets/src/bundles/add_forge/request-dashboard.js
@@ -1,131 +1,131 @@
/**
* Copyright (C) 2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
import {handleFetchError, csrfPost, getHumanReadableDate} from 'utils/functions';
import emailTempate from './forge-admin-email.ejs';
import requestHistoryItem from './add-request-history-item.ejs';
let forgeRequest;
export function onRequestDashboardLoad(requestId) {
$(document).ready(() => {
populateRequestDetails(requestId);
$('#contactForgeAdmin').click((event) => {
contactForgeAdmin(event);
});
$('#updateRequestForm').submit(async function(event) {
event.preventDefault();
try {
const response = await csrfPost($(this).attr('action'),
{'Content-Type': 'application/x-www-form-urlencoded'},
$(this).serialize());
handleFetchError(response);
$('#userMessage').text('The request status has been updated ');
$('#userMessage').removeClass('badge-danger');
$('#userMessage').addClass('badge-success');
populateRequestDetails(requestId);
} catch (response) {
$('#userMessage').text('Sorry; Updating the request failed');
$('#userMessage').removeClass('badge-success');
$('#userMessage').addClass('badge-danger');
}
});
});
}
async function populateRequestDetails(requestId) {
try {
const response = await fetch(Urls.api_1_add_forge_request_get(requestId));
handleFetchError(response);
const data = await response.json();
forgeRequest = data.request;
$('#requestStatus').text(swh.add_forge.formatRequestStatusName(forgeRequest.status));
$('#requestType').text(forgeRequest.forge_type);
$('#requestURL').text(forgeRequest.forge_url);
$('#requestContactName').text(forgeRequest.forge_contact_name);
$('#requestContactConsent').text(forgeRequest.submitter_forward_username);
$('#requestContactEmail').text(forgeRequest.forge_contact_email);
$('#submitterMessage').text(forgeRequest.forge_contact_comment);
$('#updateComment').val('');
// Setting data for the email, now adding static data
$('#contactForgeAdmin').attr('emailTo', forgeRequest.forge_contact_email);
- $('#contactForgeAdmin').attr('emailSubject', `[swh-add_forge_now] Request ${forgeRequest.id}`);
+ $('#contactForgeAdmin').attr('emailSubject', `Software Heritage archival request for ${forgeRequest.forge_domain}`);
populateRequestHistory(data.history);
populateDecisionSelectOption(forgeRequest.status);
} catch (response) {
// The error message
$('#fetchError').removeClass('d-none');
$('#requestDetails').addClass('d-none');
}
}
function populateRequestHistory(history) {
$('#requestHistory').children().remove();
history.forEach((event, index) => {
const historyEvent = requestHistoryItem({
'event': event,
'index': index,
'getHumanReadableDate': getHumanReadableDate
});
$('#requestHistory').append(historyEvent);
});
}
export function populateDecisionSelectOption(currentStatus) {
const nextStatusesFor = {
'PENDING': ['WAITING_FOR_FEEDBACK', 'REJECTED', 'SUSPENDED'],
'WAITING_FOR_FEEDBACK': ['FEEDBACK_TO_HANDLE'],
'FEEDBACK_TO_HANDLE': [
'WAITING_FOR_FEEDBACK',
'ACCEPTED',
'REJECTED',
'SUSPENDED'
],
'ACCEPTED': ['SCHEDULED'],
'SCHEDULED': [
'FIRST_LISTING_DONE',
'FIRST_ORIGIN_LOADED'
],
'FIRST_LISTING_DONE': ['FIRST_ORIGIN_LOADED'],
'FIRST_ORIGIN_LOADED': [],
'REJECTED': [],
'SUSPENDED': ['PENDING'],
'DENIED': []
};
// Determine the possible next status out of the current one
const nextStatuses = nextStatusesFor[currentStatus];
function addStatusOption(status, index) {
// Push the next possible status options
const label = swh.add_forge.formatRequestStatusName(status);
$('#decisionOptions').append(
``
);
}
// Remove all the options and add new ones
$('#decisionOptions').children().remove();
nextStatuses.forEach(addStatusOption);
$('#decisionOptions').append(
''
);
}
function contactForgeAdmin(event) {
// Open the mailclient with pre-filled text
const mailTo = $('#contactForgeAdmin').attr('emailTo');
const subject = $('#contactForgeAdmin').attr('emailSubject');
const emailText = emailTempate({'forgeUrl': forgeRequest.forge_url}).trim().replace(/\n/g, '%0D%0A');
const w = window.open('', '_blank', '', true);
w.location.href = `mailto: ${mailTo}?subject=${subject}&body=${emailText}`;
w.focus();
}
diff --git a/cypress/integration/add-forge-now-request-dashboard.spec.js b/cypress/integration/add-forge-now-request-dashboard.spec.js
index 70bdaeca..2acd0772 100644
--- a/cypress/integration/add-forge-now-request-dashboard.spec.js
+++ b/cypress/integration/add-forge-now-request-dashboard.spec.js
@@ -1,229 +1,230 @@
/**
* Copyright (C) 2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
-let requestId;
+let requestId, forgeDomain;
function createDummyRequest(urls) {
cy.task('db:add_forge_now:delete');
cy.userLogin();
cy.getCookie('csrftoken').its('value').then((token) => {
cy.request({
method: 'POST',
url: urls.api_1_add_forge_request_create(),
body: {
forge_type: 'bitbucket',
forge_url: 'test.example.com',
forge_contact_email: 'test@example.com',
forge_contact_name: 'test user',
submitter_forward_username: true,
forge_contact_comment: 'test comment'
},
headers: {
'X-CSRFToken': token
}
}).then((response) => {
- // setting requestId from response
+ // setting requestId and forgeDomain from response
requestId = response.body.id;
+ forgeDomain = response.body.forge_domain;
// logout the user
cy.visit(urls.swh_web_homepage());
cy.contains('a', 'logout').click();
});
});
}
describe('Test add forge now request dashboard load', function() {
before(function() {
// Create an add-forge-request object in the DB
createDummyRequest(this.Urls);
});
beforeEach(function() {
const url = this.Urls.add_forge_now_request_dashboard(requestId);
// request dashboard require admin permissions to view
cy.adminLogin();
cy.intercept(`${this.Urls.api_1_add_forge_request_get(requestId)}**`).as('forgeRequestGet');
cy.visit(url);
});
it('should load add forge request details', function() {
cy.wait('@forgeRequestGet');
cy.get('#requestStatus')
.should('contain', 'Pending');
cy.get('#requestType')
.should('contain', 'bitbucket');
cy.get('#requestURL')
.should('contain', 'test.example.com');
cy.get('#requestContactEmail')
.should('contain', 'test@example.com');
cy.get('#requestContactName')
.should('contain', 'test user');
cy.get('#requestContactEmail')
.should('contain', 'test@example.com');
cy.get('#requestContactConsent')
.should('contain', 'true');
cy.get('#submitterMessage')
.should('contain', 'test comment');
});
it('should show send message link', function() {
cy.wait('@forgeRequestGet');
cy.get('#contactForgeAdmin')
.should('have.attr', 'emailto')
.and('include', 'test@example.com');
cy.get('#contactForgeAdmin')
.should('have.attr', 'emailsubject')
- .and('include', `[swh-add_forge_now] Request ${requestId}`);
+ .and('include', `Software Heritage archival request for ${forgeDomain}`);
});
it('should not show any error message', function() {
cy.wait('@forgeRequestGet');
cy.get('#fetchError')
.should('have.class', 'd-none');
cy.get('#requestDetails')
.should('not.have.class', 'd-none');
});
it('should show error message for an api error', function() {
// requesting with a non existing request ID
const invalidRequestId = requestId + 10;
const url = this.Urls.add_forge_now_request_dashboard(invalidRequestId);
cy.intercept(`${this.Urls.api_1_add_forge_request_get(invalidRequestId)}**`,
{statusCode: 400}).as('forgeAddInvalidRequest');
cy.visit(url);
cy.wait('@forgeAddInvalidRequest');
cy.get('#fetchError')
.should('not.have.class', 'd-none');
cy.get('#requestDetails')
.should('have.class', 'd-none');
});
it('should load add forge request history', function() {
cy.wait('@forgeRequestGet');
cy.get('#requestHistory')
.children()
.should('have.length', 1);
cy.get('#requestHistory')
.children()
.should('contain', 'New status: Pending');
cy.get('#requestHistory')
.should('contain', 'From user (SUBMITTER)');
});
it('should load possible next status', function() {
cy.wait('@forgeRequestGet');
// 3 possible next status and the comment option
cy.get('#decisionOptions')
.children()
.should('have.length', 4);
});
});
function populateAndSubmitForm() {
cy.get('#decisionOptions').select('WAITING_FOR_FEEDBACK');
cy.get('#updateComment').type('This is an update comment');
cy.get('#updateRequestForm').submit();
}
describe('Test forge now request update', function() {
beforeEach(function() {
createDummyRequest(this.Urls);
const url = this.Urls.add_forge_now_request_dashboard(requestId);
cy.adminLogin();
// intercept GET API on page load
cy.intercept(`${this.Urls.api_1_add_forge_request_get(requestId)}**`).as('forgeRequestGet');
// intercept update POST API
cy.intercept('POST', `${this.Urls.api_1_add_forge_request_update(requestId)}**`).as('forgeRequestUpdate');
cy.visit(url);
});
it('should submit correct details', function() {
cy.wait('@forgeRequestGet');
populateAndSubmitForm();
// Making sure posting the right data
cy.wait('@forgeRequestUpdate').its('request.body')
.should('include', 'new_status')
.should('include', 'text')
.should('include', 'WAITING_FOR_FEEDBACK');
});
it('should show success message', function() {
cy.wait('@forgeRequestGet');
populateAndSubmitForm();
// Making sure showing the success message
cy.wait('@forgeRequestUpdate');
cy.get('#userMessage')
.should('contain', 'The request status has been updated')
.should('not.have.class', 'badge-danger')
.should('have.class', 'badge-success');
});
it('should update the dashboard after submit', function() {
cy.wait('@forgeRequestGet');
populateAndSubmitForm();
// Making sure the UI is updated after the submit
cy.wait('@forgeRequestGet');
cy.get('#requestStatus')
.should('contain', 'Waiting for feedback');
cy.get('#requestHistory')
.children()
.should('have.length', 2);
cy.get('#requestHistory')
.children()
.should('contain', 'New status: Waiting for feedback');
cy.get('#requestHistory')
.children()
.should('contain', 'This is an update comment');
cy.get('#requestHistory')
.children()
.should('contain', 'Status changed to: Waiting for feedback');
cy.get('#decisionOptions')
.children()
.should('have.length', 2);
});
it('should show an error on API failure', function() {
cy.intercept('POST',
`${this.Urls.api_1_add_forge_request_update(requestId)}**`,
{forceNetworkError: true})
.as('updateFailedRequest');
cy.get('#updateComment').type('This is an update comment');
cy.get('#updateRequestForm').submit();
cy.wait('@updateFailedRequest');
cy.get('#userMessage')
.should('contain', 'Sorry; Updating the request failed')
.should('have.class', 'badge-danger')
.should('not.have.class', 'badge-success');
});
});
diff --git a/swh/web/add_forge_now/models.py b/swh/web/add_forge_now/models.py
index 61a77796..0a4b8aa9 100644
--- a/swh/web/add_forge_now/models.py
+++ b/swh/web/add_forge_now/models.py
@@ -1,120 +1,136 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import enum
from typing import List
+from urllib.parse import urlparse
from django.db import models
from ..config import get_config
from ..inbound_email.utils import get_address_for_pk
from .apps import APP_LABEL
class RequestStatus(enum.Enum):
"""Request statuses.
Values are used in the ui.
"""
PENDING = "Pending"
WAITING_FOR_FEEDBACK = "Waiting for feedback"
FEEDBACK_TO_HANDLE = "Feedback to handle"
ACCEPTED = "Accepted"
SCHEDULED = "Scheduled"
FIRST_LISTING_DONE = "First listing done"
FIRST_ORIGIN_LOADED = "First origin loaded"
REJECTED = "Rejected"
SUSPENDED = "Suspended"
DENIED = "Denied"
@classmethod
def choices(cls):
return tuple((variant.name, variant.value) for variant in cls)
def allowed_next_statuses(self) -> List[RequestStatus]:
next_statuses = {
self.PENDING: [self.WAITING_FOR_FEEDBACK, self.REJECTED, self.SUSPENDED],
self.WAITING_FOR_FEEDBACK: [self.FEEDBACK_TO_HANDLE],
self.FEEDBACK_TO_HANDLE: [
self.WAITING_FOR_FEEDBACK,
self.ACCEPTED,
self.REJECTED,
self.SUSPENDED,
],
self.ACCEPTED: [self.SCHEDULED],
self.SCHEDULED: [
self.FIRST_LISTING_DONE,
# in case of race condition between lister and loader:
self.FIRST_ORIGIN_LOADED,
],
self.FIRST_LISTING_DONE: [self.FIRST_ORIGIN_LOADED],
self.FIRST_ORIGIN_LOADED: [],
self.REJECTED: [],
self.SUSPENDED: [self.PENDING],
self.DENIED: [],
}
return next_statuses[self] # type: ignore
class RequestActorRole(enum.Enum):
MODERATOR = "moderator"
SUBMITTER = "submitter"
FORGE_ADMIN = "forge admin"
@classmethod
def choices(cls):
return tuple((variant.name, variant.value) for variant in cls)
class RequestHistory(models.Model):
"""Comment or status change. This is commented or changed by either submitter or
moderator.
"""
request = models.ForeignKey("Request", models.DO_NOTHING)
text = models.TextField()
actor = models.TextField()
actor_role = models.TextField(choices=RequestActorRole.choices())
date = models.DateTimeField(auto_now_add=True)
new_status = models.TextField(choices=RequestStatus.choices(), null=True)
class Meta:
app_label = APP_LABEL
db_table = "add_forge_request_history"
class Request(models.Model):
status = models.TextField(
choices=RequestStatus.choices(),
default=RequestStatus.PENDING.name,
)
submission_date = models.DateTimeField(auto_now_add=True)
submitter_name = models.TextField()
submitter_email = models.TextField()
submitter_forward_username = models.BooleanField(default=False)
# FIXME: shall we do create a user model inside the webapp instead?
forge_type = models.TextField()
forge_url = models.TextField()
forge_contact_email = models.EmailField()
forge_contact_name = models.TextField()
forge_contact_comment = models.TextField(
null=True,
help_text="Where did you find this contact information (url, ...)",
)
class Meta:
app_label = APP_LABEL
db_table = "add_forge_request"
@property
def inbound_email_address(self) -> str:
"""Generate an email address for correspondence related to this request."""
base_address = get_config()["add_forge_now"]["email_address"]
return get_address_for_pk(salt=APP_LABEL, base_address=base_address, pk=self.pk)
+
+ @property
+ def forge_domain(self) -> str:
+ """Get the domain/netloc out of the forge_url.
+
+ Fallback to using the first part of the url path, if the netloc can't be found
+ (for instance, if the url scheme hasn't been set).
+ """
+
+ parsed_url = urlparse(self.forge_url)
+ domain = parsed_url.netloc
+ if not domain:
+ domain = parsed_url.path.split("/", 1)[0]
+
+ return domain
diff --git a/swh/web/api/views/add_forge_now.py b/swh/web/api/views/add_forge_now.py
index 3d2c95ed..ce30e026 100644
--- a/swh/web/api/views/add_forge_now.py
+++ b/swh/web/api/views/add_forge_now.py
@@ -1,397 +1,398 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from typing import Any, Dict, Union
from django.core.exceptions import ObjectDoesNotExist
from django.core.paginator import Paginator
from django.db import transaction
from django.db.models.query import QuerySet
from django.forms import CharField, ModelForm
from django.http import HttpResponseBadRequest
from django.http.request import HttpRequest
from django.http.response import HttpResponse, HttpResponseForbidden
from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.response import Response
from swh.web.add_forge_now.models import Request as AddForgeRequest
from swh.web.add_forge_now.models import RequestActorRole as AddForgeNowRequestActorRole
from swh.web.add_forge_now.models import RequestHistory as AddForgeNowRequestHistory
from swh.web.add_forge_now.models import RequestStatus as AddForgeNowRequestStatus
from swh.web.api.apidoc import api_doc, format_docstring
from swh.web.api.apiurls import api_route
from swh.web.auth.utils import ADD_FORGE_MODERATOR_PERMISSION
from swh.web.common.exc import BadInputExc
from swh.web.common.utils import has_add_forge_now_permission, reverse
def _block_while_testing():
"""Replaced by tests to check concurrency behavior"""
pass
class AddForgeNowRequestForm(ModelForm):
forge_contact_comment = CharField(
required=False,
)
class Meta:
model = AddForgeRequest
fields = (
"forge_type",
"forge_url",
"forge_contact_email",
"forge_contact_name",
"forge_contact_comment",
"submitter_forward_username",
)
class AddForgeNowRequestHistoryForm(ModelForm):
new_status = CharField(
max_length=200,
required=False,
)
class Meta:
model = AddForgeNowRequestHistory
fields = ("text", "new_status")
class AddForgeNowRequestSerializer(serializers.ModelSerializer):
inbound_email_address = serializers.CharField()
+ forge_domain = serializers.CharField()
last_moderator = serializers.SerializerMethodField()
last_modified_date = serializers.SerializerMethodField()
history: Dict[int, QuerySet] = {}
class Meta:
model = AddForgeRequest
fields = "__all__"
def _gethistory(self, request):
if request.id not in self.history:
self.history[request.id] = AddForgeNowRequestHistory.objects.filter(
request=request
).order_by("id")
return self.history[request.id]
def get_last_moderator(self, request):
last_history_with_moderator = (
self._gethistory(request).filter(actor_role="MODERATOR").last()
)
return (
last_history_with_moderator.actor if last_history_with_moderator else "None"
)
def get_last_modified_date(self, request):
last_history = self._gethistory(request).last()
return (
last_history.date.isoformat().replace("+00:00", "Z")
if last_history
else None
)
class AddForgeNowRequestPublicSerializer(serializers.ModelSerializer):
"""Serializes AddForgeRequest without private fields."""
class Meta:
model = AddForgeRequest
fields = ("id", "forge_url", "forge_type", "status", "submission_date")
class AddForgeNowRequestHistorySerializer(serializers.ModelSerializer):
class Meta:
model = AddForgeNowRequestHistory
exclude = ("request",)
class AddForgeNowRequestHistoryPublicSerializer(serializers.ModelSerializer):
class Meta:
model = AddForgeNowRequestHistory
fields = ("id", "date", "new_status", "actor_role")
@api_route(
r"/add-forge/request/create/",
"api-1-add-forge-request-create",
methods=["POST"],
)
@api_doc("/add-forge/request/create")
@format_docstring()
@transaction.atomic
def api_add_forge_request_create(request: Union[HttpRequest, Request]) -> HttpResponse:
"""
.. http:post:: /api/1/add-forge/request/create/
Create a new request to add a forge to the list of those crawled regularly
by Software Heritage.
.. warning::
That endpoint is not publicly available and requires authentication
in order to be able to request it.
{common_headers}
:[0-9]+)/update/",
"api-1-add-forge-request-update",
methods=["POST"],
)
@api_doc("/add-forge/request/update", tags=["hidden"])
@format_docstring()
@transaction.atomic
def api_add_forge_request_update(
request: Union[HttpRequest, Request], id: int
) -> HttpResponse:
"""
.. http:post:: /api/1/add-forge/request/update/
Update a request to add a forge to the list of those crawled regularly
by Software Heritage.
.. warning::
That endpoint is not publicly available and requires authentication
in order to be able to request it.
{common_headers}
:[0-9]+)/get/",
"api-1-add-forge-request-get",
methods=["GET"],
)
@api_doc("/add-forge/request/get")
@format_docstring()
def api_add_forge_request_get(request: Request, id: int):
"""
.. http:get:: /api/1/add-forge/request/get/
Return all details about an add-forge request.
{common_headers}
:param int id: add-forge request identifier
:statuscode 200: request details successfully returned
:statuscode 400: request identifier does not exist
"""
try:
add_forge_request = AddForgeRequest.objects.get(id=id)
except ObjectDoesNotExist:
raise BadInputExc("Request id does not exist")
request_history = AddForgeNowRequestHistory.objects.filter(
request=add_forge_request
).order_by("id")
if request.user.is_authenticated and request.user.has_perm(
ADD_FORGE_MODERATOR_PERMISSION
):
data = AddForgeNowRequestSerializer(add_forge_request).data
history = AddForgeNowRequestHistorySerializer(request_history, many=True).data
else:
data = AddForgeNowRequestPublicSerializer(add_forge_request).data
history = AddForgeNowRequestHistoryPublicSerializer(
request_history, many=True
).data
return {"request": data, "history": history}
diff --git a/swh/web/tests/add_forge_now/test_models.py b/swh/web/tests/add_forge_now/test_models.py
index 86e896f1..e9e617e2 100644
--- a/swh/web/tests/add_forge_now/test_models.py
+++ b/swh/web/tests/add_forge_now/test_models.py
@@ -1,26 +1,38 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
-from swh.web.add_forge_now.models import RequestStatus
+from swh.web.add_forge_now.models import Request, RequestStatus
@pytest.mark.parametrize(
"current_status, allowed_next_statuses",
[
(
RequestStatus.PENDING,
[
RequestStatus.WAITING_FOR_FEEDBACK,
RequestStatus.REJECTED,
RequestStatus.SUSPENDED,
],
),
(RequestStatus.WAITING_FOR_FEEDBACK, [RequestStatus.FEEDBACK_TO_HANDLE]),
],
)
def test_allowed_next_statuses(current_status, allowed_next_statuses):
assert current_status.allowed_next_statuses() == allowed_next_statuses
+
+
+@pytest.mark.parametrize(
+ "forge_url, expected_domain",
+ [
+ ("https://gitlab.example.com/foo/bar", "gitlab.example.com"),
+ ("gitlab.example.com", "gitlab.example.com"),
+ ("gitlab.example.com/foo/bar", "gitlab.example.com"),
+ ],
+)
+def test_request_forge_domain(forge_url, expected_domain):
+ assert Request(forge_url=forge_url).forge_domain == expected_domain
diff --git a/swh/web/tests/api/views/test_add_forge_now.py b/swh/web/tests/api/views/test_add_forge_now.py
index 538fa50f..c1843e87 100644
--- a/swh/web/tests/api/views/test_add_forge_now.py
+++ b/swh/web/tests/api/views/test_add_forge_now.py
@@ -1,564 +1,569 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import threading
import time
from typing import Dict
-from urllib.parse import urlencode
+from urllib.parse import urlencode, urlparse
import iso8601
import pytest
from swh.web.add_forge_now.models import Request
from swh.web.common.utils import reverse
from swh.web.config import get_config
from swh.web.inbound_email.utils import get_address_for_pk
from swh.web.tests.utils import (
check_api_get_responses,
check_api_post_response,
check_http_post_response,
)
@pytest.mark.django_db
def test_add_forge_request_create_anonymous_user(api_client):
url = reverse("api-1-add-forge-request-create")
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db
def test_add_forge_request_create_empty(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
resp = check_api_post_response(api_client, url, status_code=400)
assert '"forge_type"' in resp.data["reason"]
ADD_FORGE_DATA_FORGE1: Dict = {
"forge_type": "gitlab",
"forge_url": "https://gitlab.example.org",
"forge_contact_email": "admin@gitlab.example.org",
"forge_contact_name": "gitlab.example.org admin",
"forge_contact_comment": "user marked as owner in forge members",
"submitter_forward_username": True,
}
ADD_FORGE_DATA_FORGE2: Dict = {
"forge_type": "gitea",
"forge_url": "https://gitea.example.org",
"forge_contact_email": "admin@gitea.example.org",
"forge_contact_name": "gitea.example.org admin",
"forge_contact_comment": "user marked as owner in forge members",
"submitter_forward_username": True,
}
ADD_FORGE_DATA_FORGE3: Dict = {
"forge_type": "heptapod",
"forge_url": "https://heptapod.host/",
"forge_contact_email": "admin@example.org",
"forge_contact_name": "heptapod admin",
"forge_contact_comment": "", # authorized empty or null comment
"submitter_forward_username": False,
}
ADD_FORGE_DATA_FORGE4: Dict = {
**ADD_FORGE_DATA_FORGE3,
"forge_url": "https://heptapod2.host/",
"submitter_forward_username": "on",
}
ADD_FORGE_DATA_FORGE5: Dict = {
**ADD_FORGE_DATA_FORGE3,
"forge_url": "https://heptapod3.host/",
"submitter_forward_username": "off",
}
def inbound_email_for_pk(pk: int) -> str:
"""Check that the inbound email matches the one expected for the given pk"""
base_address = get_config()["add_forge_now"]["email_address"]
return get_address_for_pk(
salt="swh_web_add_forge_now", base_address=base_address, pk=pk
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
@pytest.mark.parametrize(
"add_forge_data",
[
ADD_FORGE_DATA_FORGE1,
ADD_FORGE_DATA_FORGE2,
ADD_FORGE_DATA_FORGE3,
ADD_FORGE_DATA_FORGE4,
],
)
def test_add_forge_request_create_success_post(
api_client, regular_user, add_forge_data
):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
date_before = datetime.datetime.now(tz=datetime.timezone.utc)
resp = check_api_post_response(
api_client,
url,
data=add_forge_data,
status_code=201,
)
date_after = datetime.datetime.now(tz=datetime.timezone.utc)
consent = add_forge_data["submitter_forward_username"]
# map the expected result with what's expectedly read from the db to ease comparison
expected_consent_bool = consent == "on" if isinstance(consent, str) else consent
assert resp.data == {
**add_forge_data,
"id": resp.data["id"],
"status": "PENDING",
"submission_date": resp.data["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"submitter_forward_username": expected_consent_bool,
"last_moderator": resp.data["last_moderator"],
"last_modified_date": resp.data["last_modified_date"],
"inbound_email_address": inbound_email_for_pk(resp.data["id"]),
+ "forge_domain": urlparse(add_forge_data["forge_url"]).netloc,
}
assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after
request = Request.objects.all().last()
assert request.forge_url == add_forge_data["forge_url"]
assert request.submitter_name == regular_user.username
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_create_success_form_encoded(client, regular_user):
client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
date_before = datetime.datetime.now(tz=datetime.timezone.utc)
resp = check_http_post_response(
client,
url,
request_content_type="application/x-www-form-urlencoded",
data=urlencode(ADD_FORGE_DATA_FORGE1),
status_code=201,
)
date_after = datetime.datetime.now(tz=datetime.timezone.utc)
assert resp.data == {
**ADD_FORGE_DATA_FORGE1,
"id": resp.data["id"],
"status": "PENDING",
"submission_date": resp.data["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data["last_moderator"],
"last_modified_date": resp.data["last_modified_date"],
"inbound_email_address": inbound_email_for_pk(1),
+ "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
}
assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after
request = Request.objects.all()[0]
assert request.forge_url == ADD_FORGE_DATA_FORGE1["forge_url"]
assert request.submitter_name == regular_user.username
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_create_duplicate(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
check_api_post_response(
api_client,
url,
data=ADD_FORGE_DATA_FORGE1,
status_code=201,
)
check_api_post_response(
api_client,
url,
data=ADD_FORGE_DATA_FORGE1,
status_code=409,
)
requests = Request.objects.all()
assert len(requests) == 1
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_anonymous_user(api_client):
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_regular_user(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_non_existent(api_client, add_forge_moderator):
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=400)
def create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE1):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
return check_api_post_response(
api_client,
url,
data=data,
status_code=201,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_empty(api_client, regular_user, add_forge_moderator):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=400)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_missing_field(
api_client, regular_user, add_forge_moderator
):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, data={}, status_code=400)
check_api_post_response(
api_client, url, data={"new_status": "REJECTED"}, status_code=400
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update(api_client, regular_user, add_forge_moderator):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(
api_client, url, data={"text": "updating request"}, status_code=200
)
check_api_post_response(
api_client,
url,
data={"new_status": "REJECTED", "text": "request rejected"},
status_code=200,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_invalid_new_status(
api_client, regular_user, add_forge_moderator
):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(
api_client,
url,
data={"new_status": "ACCEPTED", "text": "request accepted"},
status_code=400,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_status_concurrent(
api_client, regular_user, add_forge_moderator, mocker
):
_block_while_testing = mocker.patch(
"swh.web.api.views.add_forge_now._block_while_testing"
)
_block_while_testing.side_effect = lambda: time.sleep(1)
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
worker_ended = False
def worker():
nonlocal worker_ended
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
worker_ended = True
# this thread will first modify the request status to WAITING_FOR_FEEDBACK
thread = threading.Thread(target=worker)
thread.start()
# the other thread (slower) will attempt to modify the request status to REJECTED
# but it will not be allowed as the first faster thread already modified it
# and REJECTED state can not be reached from WAITING_FOR_FEEDBACK one
time.sleep(0.5)
check_api_post_response(
api_client,
url,
data={"new_status": "REJECTED", "text": "request accepted"},
status_code=400,
)
thread.join()
assert worker_ended
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_anonymous(api_client, regular_user):
url = reverse("api-1-add-forge-request-list")
resp = check_api_get_responses(api_client, url, status_code=200)
assert resp.data == []
create_add_forge_request(api_client, regular_user)
resp = check_api_get_responses(api_client, url, status_code=200)
add_forge_request = {
"forge_url": ADD_FORGE_DATA_FORGE1["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE1["forge_type"],
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"id": resp.data[0]["id"],
}
assert resp.data == [add_forge_request]
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
resp = check_api_get_responses(api_client, url, status_code=200)
other_forge_request = {
"forge_url": ADD_FORGE_DATA_FORGE2["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE2["forge_type"],
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"id": resp.data[0]["id"],
}
assert resp.data == [other_forge_request, add_forge_request]
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_moderator(
api_client, regular_user, add_forge_moderator
):
url = reverse("api-1-add-forge-request-list")
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
api_client.force_login(add_forge_moderator)
resp = check_api_get_responses(api_client, url, status_code=200)
add_forge_request = {
**ADD_FORGE_DATA_FORGE1,
"status": "PENDING",
"submission_date": resp.data[1]["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data[1]["last_moderator"],
"last_modified_date": resp.data[1]["last_modified_date"],
"id": resp.data[1]["id"],
"inbound_email_address": inbound_email_for_pk(resp.data[1]["id"]),
+ "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
}
other_forge_request = {
**ADD_FORGE_DATA_FORGE2,
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data[0]["last_moderator"],
"last_modified_date": resp.data[0]["last_modified_date"],
"id": resp.data[0]["id"],
"inbound_email_address": inbound_email_for_pk(resp.data[0]["id"]),
+ "forge_domain": urlparse(ADD_FORGE_DATA_FORGE2["forge_url"]).netloc,
}
assert resp.data == [other_forge_request, add_forge_request]
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_pagination(
api_client, regular_user, api_request_factory
):
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
url = reverse("api-1-add-forge-request-list", query_params={"per_page": 1})
resp = check_api_get_responses(api_client, url, 200)
assert len(resp.data) == 1
request = api_request_factory.get(url)
next_url = reverse(
"api-1-add-forge-request-list",
query_params={"page": 2, "per_page": 1},
request=request,
)
assert resp["Link"] == f'<{next_url}>; rel="next"'
resp = check_api_get_responses(api_client, next_url, 200)
assert len(resp.data) == 1
prev_url = reverse(
"api-1-add-forge-request-list",
query_params={"page": 1, "per_page": 1},
request=request,
)
assert resp["Link"] == f'<{prev_url}>; rel="previous"'
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_submitter_filtering(
api_client, regular_user, regular_user2
):
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user2, data=ADD_FORGE_DATA_FORGE2)
api_client.force_login(regular_user)
url = reverse(
"api-1-add-forge-request-list", query_params={"user_requests_only": 1}
)
resp = check_api_get_responses(api_client, url, status_code=200)
assert len(resp.data) == 1
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get(api_client, regular_user, add_forge_moderator):
resp = create_add_forge_request(api_client, regular_user)
submission_date = resp.data["submission_date"]
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
api_client.force_login(add_forge_moderator)
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
api_client.logout()
url = reverse("api-1-add-forge-request-get", url_args={"id": 1})
resp = check_api_get_responses(api_client, url, status_code=200)
assert resp.data == {
"request": {
"forge_url": ADD_FORGE_DATA_FORGE1["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE1["forge_type"],
"id": 1,
"status": "WAITING_FOR_FEEDBACK",
"submission_date": submission_date,
},
"history": [
{
"id": 1,
"actor_role": "SUBMITTER",
"date": resp.data["history"][0]["date"],
"new_status": "PENDING",
},
{
"id": 2,
"actor_role": "MODERATOR",
"date": resp.data["history"][1]["date"],
"new_status": "WAITING_FOR_FEEDBACK",
},
],
}
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get_moderator(api_client, regular_user, add_forge_moderator):
resp = create_add_forge_request(api_client, regular_user)
submission_date = resp.data["submission_date"]
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
api_client.force_login(add_forge_moderator)
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
url = reverse("api-1-add-forge-request-get", url_args={"id": 1})
resp = check_api_get_responses(api_client, url, status_code=200)
resp.data["history"] = [dict(history_item) for history_item in resp.data["history"]]
assert resp.data == {
"request": {
**ADD_FORGE_DATA_FORGE1,
"id": 1,
"status": "WAITING_FOR_FEEDBACK",
"submission_date": submission_date,
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": add_forge_moderator.username,
"last_modified_date": resp.data["history"][1]["date"],
"inbound_email_address": inbound_email_for_pk(1),
+ "forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
},
"history": [
{
"id": 1,
"text": "",
"actor": regular_user.username,
"actor_role": "SUBMITTER",
"date": resp.data["history"][0]["date"],
"new_status": "PENDING",
},
{
"id": 2,
"text": "waiting for message",
"actor": add_forge_moderator.username,
"actor_role": "MODERATOR",
"date": resp.data["history"][1]["date"],
"new_status": "WAITING_FOR_FEEDBACK",
},
],
}
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get_invalid(api_client):
url = reverse("api-1-add-forge-request-get", url_args={"id": 3})
check_api_get_responses(api_client, url, status_code=400)