';
for (const info of saveRequestInfo) {
content +=
`
${info.key}
${info.value}
`;
}
content += '
';
}
$('.swh-popover').html(content);
$(event.target).popover('update');
}
export function fillSaveRequestFormAndScroll(visitType, originUrl) {
$('#swh-input-origin-url').val(originUrl);
let originTypeFound = false;
$('#swh-input-visit-type option').each(function() {
const val = $(this).val();
if (val && originUrl.includes(val)) {
$(this).prop('selected', true);
originTypeFound = true;
}
});
if (!originTypeFound) {
$('#swh-input-visit-type option').each(function() {
const val = $(this).val();
if (val === visitType) {
$(this).prop('selected', true);
}
});
}
window.scrollTo(0, 0);
}
diff --git a/assets/src/utils/functions.js b/assets/src/utils/functions.js
index 90960f1d..fa7231b4 100644
--- a/assets/src/utils/functions.js
+++ b/assets/src/utils/functions.js
@@ -1,184 +1,194 @@
/**
* Copyright (C) 2018-2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
// utility functions
import Cookies from 'js-cookie';
export function handleFetchError(response) {
if (!response.ok) {
throw response;
}
return response;
}
export function handleFetchErrors(responses) {
for (let i = 0; i < responses.length; ++i) {
if (!responses[i].ok) {
throw responses[i];
}
}
return responses;
}
export function errorMessageFromResponse(errorData, defaultMessage) {
let errorMessage = '';
try {
const reason = JSON.parse(errorData['reason']);
Object.entries(reason).forEach((keys, _) => {
const key = keys[0];
const message = keys[1][0]; // take only the first issue
errorMessage += `\n${key}: ${message}`;
});
} catch (_) {
errorMessage = errorData['reason']; // can't parse it, leave it raw
}
return errorMessage ? `Error: ${errorMessage}` : defaultMessage;
}
export function staticAsset(asset) {
return `${__STATIC__}${asset}`;
}
export function csrfPost(url, headers = {}, body = null) {
headers['X-CSRFToken'] = Cookies.get('csrftoken');
return fetch(url, {
credentials: 'include',
headers: headers,
method: 'POST',
body: body
});
}
export function isGitRepoUrl(url, pathPrefix = '/') {
const allowedProtocols = ['http:', 'https:', 'git:'];
if (allowedProtocols.find(protocol => protocol === url.protocol) === undefined) {
return false;
}
if (!url.pathname.startsWith(pathPrefix)) {
return false;
}
const re = new RegExp('[\\w\\.-]+\\/?(?!=.git)(?:\\.git\\/?)?$');
return re.test(url.pathname.slice(pathPrefix.length));
};
export function removeUrlFragment() {
history.replaceState('', document.title, window.location.pathname + window.location.search);
}
export function selectText(startNode, endNode) {
const selection = window.getSelection();
selection.removeAllRanges();
const range = document.createRange();
range.setStart(startNode, 0);
if (endNode.nodeName !== '#text') {
range.setEnd(endNode, endNode.childNodes.length);
} else {
range.setEnd(endNode, endNode.textContent.length);
}
selection.addRange(range);
}
export function htmlAlert(type, message, closable = false) {
let closeButton = '';
let extraClasses = '';
if (closable) {
closeButton =
``;
extraClasses = 'alert-dismissible';
}
return `
${message}${closeButton}
`;
}
-export function isValidURL(string) {
+export function validateUrl(url, allowedProtocols = []) {
+ let originUrl = null;
+ let validUrl = true;
+
try {
- new URL(string);
- } catch (_) {
- return false;
+ originUrl = new URL(url);
+ } catch (TypeError) {
+ validUrl = false;
}
- return true;
+
+ if (validUrl && allowedProtocols.length) {
+ validUrl = (
+ allowedProtocols.find(protocol => protocol === originUrl.protocol) !== undefined
+ );
+ }
+
+ return validUrl ? originUrl : null;
}
export async function isArchivedOrigin(originPath) {
- if (!isValidURL(originPath)) {
+ if (!validateUrl(originPath)) {
// Not a valid URL, return immediately
return false;
} else {
const response = await fetch(Urls.api_1_origin(originPath));
return response.ok && response.status === 200; // Success response represents an archived origin
}
}
async function getCanonicalGithubOriginURL(ownerRepo) {
const ghApiResponse = await fetch(`https://api.github.com/repos/${ownerRepo}`);
if (ghApiResponse.ok && ghApiResponse.status === 200) {
const ghApiResponseData = await ghApiResponse.json();
return ghApiResponseData.html_url;
}
}
export async function getCanonicalOriginURL(originUrl) {
let originUrlLower = originUrl.toLowerCase();
// github.com URL processing
const ghUrlRegex = /^http[s]*:\/\/github.com\//;
if (originUrlLower.match(ghUrlRegex)) {
// remove trailing .git
if (originUrlLower.endsWith('.git')) {
originUrlLower = originUrlLower.slice(0, -4);
}
// remove trailing slash
if (originUrlLower.endsWith('/')) {
originUrlLower = originUrlLower.slice(0, -1);
}
// extract {owner}/{repo}
const ownerRepo = originUrlLower.replace(ghUrlRegex, '');
// fetch canonical URL from github Web API
const url = await getCanonicalGithubOriginURL(ownerRepo);
if (url) {
return url;
}
}
const ghpagesUrlRegex = /^http[s]*:\/\/(?[^/]+).github.io\/(?[^/]+)\/?.*/;
const parsedUrl = originUrlLower.match(ghpagesUrlRegex);
if (parsedUrl) {
const ownerRepo = `${parsedUrl.groups.owner}/${parsedUrl.groups.repo}`;
// fetch canonical URL from github Web API
const url = await getCanonicalGithubOriginURL(ownerRepo);
if (url) {
return url;
}
}
return originUrl;
}
export function getHumanReadableDate(data) {
// Display iso format date string into a human readable date
// This is expected to be used by date field in datatable listing views
// Example: 3/24/2022, 10:31:08 AM
const date = new Date(data);
return date.toLocaleString();
}
export function genLink(sanitizedUrl, type, openInNewTab = false, linkText = '') {
// Display link. It's up to the caller to sanitize sanitizedUrl first.
if (type === 'display' && sanitizedUrl) {
const encodedSanitizedUrl = encodeURI(sanitizedUrl);
if (!linkText) {
linkText = encodedSanitizedUrl;
}
let attrs = '';
if (openInNewTab) {
attrs = 'target="_blank" rel="noopener noreferrer"';
}
return `${linkText}`;
}
return sanitizedUrl;
}
diff --git a/cypress/e2e/add-forge-now-request-create.cy.js b/cypress/e2e/add-forge-now-request-create.cy.js
index 47c8f6ed..27a45132 100644
--- a/cypress/e2e/add-forge-now-request-create.cy.js
+++ b/cypress/e2e/add-forge-now-request-create.cy.js
@@ -1,264 +1,276 @@
/**
* Copyright (C) 2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
function populateForm(type, url, contact, email, consent, comment) {
cy.get('#swh-input-forge-type').select(type);
cy.get('#swh-input-forge-url').clear().type(url);
cy.get('#swh-input-forge-contact-name').clear().type(contact);
cy.get('#swh-input-forge-contact-email').clear().type(email);
if (comment) {
cy.get('#swh-input-forge-comment').clear().type(comment);
}
cy.get('#swh-input-consent-check').click({force: consent === 'on'});
}
describe('Browse requests list tests', function() {
beforeEach(function() {
this.addForgeNowUrl = this.Urls.forge_add_create();
this.listAddForgeRequestsUrl = this.Urls.add_forge_request_list_datatables();
});
it('should not show user requests filter checkbox for anonymous users', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#swh-add-forge-user-filter').should('not.exist');
});
it('should show user requests filter checkbox for authenticated users', function() {
cy.userLogin();
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#swh-add-forge-user-filter').should('exist').should('be.checked');
});
it('should only display user requests when filter is activated', function() {
// Clean up previous state
cy.task('db:add_forge_now:delete');
// 'user2' logs in and create requests
cy.user2Login();
cy.visit(this.addForgeNowUrl);
// create requests for the user 'user'
- populateForm('gitlab', 'gitlab.org', 'admin', 'admin@example.org', 'on', '');
+ populateForm('gitlab', 'https://gitlab.org', 'admin', 'admin@example.org', 'on', '');
cy.get('#requestCreateForm').submit();
// user requests filter checkbox should be in the DOM
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#swh-add-forge-user-filter').should('exist').should('be.checked');
// check unfiltered user requests
cy.get('tbody tr').then(rows => {
expect(rows.length).to.eq(1);
});
// user1 logout
cy.contains('a', 'logout').click();
// user logs in
cy.userLogin();
cy.visit(this.addForgeNowUrl);
- populateForm('gitea', 'gitea.org', 'admin', 'admin@example.org', 'on', '');
+ populateForm('gitea', 'https://gitea.org', 'admin', 'admin@example.org', 'on', '');
cy.get('#requestCreateForm').submit();
- populateForm('cgit', 'cgit.org', 'admin', 'admin@example.org', 'on', '');
+ populateForm('cgit', 'https://cgit.org', 'admin', 'admin@example.org', 'on', '');
cy.get('#requestCreateForm').submit();
// user requests filter checkbox should be in the DOM
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#swh-add-forge-user-filter').should('exist').should('be.checked');
// check unfiltered user requests
cy.get('tbody tr').then(rows => {
expect(rows.length).to.eq(2);
});
cy.get('#swh-add-forge-user-filter')
.uncheck({force: true});
// Users now sees everything
cy.get('tbody tr').then(rows => {
expect(rows.length).to.eq(2 + 1);
});
});
});
describe('Test add-forge-request creation', function() {
beforeEach(function() {
this.addForgeNowUrl = this.Urls.forge_add_create();
});
it('should show all the tabs for every user', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-tab')
.should('have.class', 'nav-link');
cy.get('#swh-add-forge-requests-list-tab')
.should('have.class', 'nav-link');
cy.get('#swh-add-forge-requests-help-tab')
.should('have.class', 'nav-link');
});
it('should show create forge tab by default', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-tab')
.should('have.class', 'active');
cy.get('#swh-add-forge-requests-list-tab')
.should('not.have.class', 'active');
});
it('should show login link for anonymous user', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#loginLink')
.should('be.visible')
.should('contain', 'log in');
});
it('should bring back after login', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#loginLink')
.should('have.attr', 'href')
.and('include', `${this.Urls.login()}?next=${this.Urls.forge_add_create()}`);
});
it('should change tabs on click', function() {
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#swh-add-forge-tab')
.should('not.have.class', 'active');
cy.get('#swh-add-forge-requests-list-tab')
.should('have.class', 'active');
cy.get('#swh-add-forge-requests-help-tab')
.should('not.have.class', 'active');
cy.url()
.should('include', `${this.Urls.forge_add_list()}`);
cy.get('.swh-add-forge-now-item')
.should('have.class', 'active');
cy.get('#swh-add-forge-requests-help-tab').click();
cy.get('#swh-add-forge-tab')
.should('not.have.class', 'active');
cy.get('#swh-add-forge-requests-list-tab')
.should('not.have.class', 'active');
cy.get('#swh-add-forge-requests-help-tab')
.should('have.class', 'active');
cy.url()
.should('include', `${this.Urls.forge_add_help()}`);
cy.get('.swh-add-forge-now-item')
.should('have.class', 'active');
cy.get('#swh-add-forge-tab').click();
cy.get('#swh-add-forge-tab')
.should('have.class', 'active');
cy.get('#swh-add-forge-requests-list-tab')
.should('not.have.class', 'active');
cy.get('#swh-add-forge-requests-help-tab')
.should('not.have.class', 'active');
cy.url()
.should('include', `${this.Urls.forge_add_create()}`);
cy.get('.swh-add-forge-now-item')
.should('have.class', 'active');
});
it('should show create form elements to authenticated user', function() {
cy.userLogin();
cy.visit(this.addForgeNowUrl);
cy.get('#swh-input-forge-type')
.should('be.visible');
cy.get('#swh-input-forge-url')
.should('be.visible');
cy.get('#swh-input-forge-contact-name')
.should('be.visible');
cy.get('#swh-input-consent-check')
.should('be.visible');
cy.get('#swh-input-forge-comment')
.should('be.visible');
cy.get('#swh-input-form-submit')
.should('be.visible');
});
it('should show browse requests table for every user', function() {
// testing only for anonymous
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#add-forge-request-browse')
.should('be.visible');
cy.get('#loginLink')
.should('not.exist');
});
it('should update browse list on successful submission', function() {
cy.userLogin();
cy.visit(this.addForgeNowUrl);
- populateForm('bitbucket', 'gitlab.com', 'test', 'test@example.com', 'on', 'test comment');
+ populateForm('bitbucket', 'https://gitlab.com', 'test', 'test@example.com', 'on', 'test comment');
cy.get('#requestCreateForm').submit();
cy.visit(this.addForgeNowUrl);
cy.get('#swh-add-forge-requests-list-tab').click();
cy.get('#add-forge-request-browse')
.should('be.visible')
.should('contain', 'gitlab.com');
cy.get('#add-forge-request-browse')
.should('be.visible')
.should('contain', 'Pending');
});
it('should show error message on conflict', function() {
cy.userLogin();
cy.visit(this.addForgeNowUrl);
- populateForm('bitbucket', 'gitlab.com', 'test', 'test@example.com', 'on', 'test comment');
+ populateForm('bitbucket', 'https://gitlab.com', 'test', 'test@example.com', 'on', 'test comment');
cy.get('#requestCreateForm').submit();
cy.get('#requestCreateForm').submit(); // Submitting the same data again
cy.get('#userMessage')
.should('have.class', 'badge-danger')
.should('contain', 'already exists');
});
it('should show error message', function() {
cy.userLogin();
cy.intercept('POST', `${this.Urls.api_1_add_forge_request_create()}**`,
{
body: {
'exception': 'BadInputExc',
'reason': '{"add-forge-comment": ["This field is required"]}'
},
statusCode: 400
}).as('errorRequest');
cy.visit(this.addForgeNowUrl);
populateForm(
- 'bitbucket', 'gitlab.com', 'test', 'test@example.com', 'off', 'comment'
+ 'bitbucket', 'https://gitlab.com', 'test', 'test@example.com', 'off', 'comment'
);
cy.get('#requestCreateForm').submit();
cy.wait('@errorRequest').then((xhr) => {
cy.get('#userMessage')
.should('have.class', 'badge-danger')
.should('contain', 'field is required');
});
});
+ it('should bot validate form when forge URL is invalid', function() {
+ cy.userLogin();
+ cy.visit(this.addForgeNowUrl);
+ populateForm('bitbucket', 'bitbucket.org', 'test', 'test@example.com', 'on', 'test comment');
+ cy.get('#requestCreateForm').submit();
+
+ cy.get('#swh-input-forge-url')
+ .then(input => {
+ assert.isFalse(input[0].checkValidity());
+ });
+ });
+
});
diff --git a/swh/web/add_forge_now/migrations/0008_turn_request_forge_url_into_url_field.py b/swh/web/add_forge_now/migrations/0008_turn_request_forge_url_into_url_field.py
new file mode 100644
index 00000000..9ec0696a
--- /dev/null
+++ b/swh/web/add_forge_now/migrations/0008_turn_request_forge_url_into_url_field.py
@@ -0,0 +1,18 @@
+# Generated by Django 2.2.28 on 2022-08-16 13:08
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("swh_web_add_forge_now", "0007_rename_denied_request_status"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="request",
+ name="forge_url",
+ field=models.URLField(),
+ ),
+ ]
diff --git a/swh/web/add_forge_now/models.py b/swh/web/add_forge_now/models.py
index 6d6f85b7..14da6c9d 100644
--- a/swh/web/add_forge_now/models.py
+++ b/swh/web/add_forge_now/models.py
@@ -1,142 +1,142 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import enum
from typing import List
from urllib.parse import urlparse
from django.db import models
from ..config import get_config
from ..inbound_email.utils import get_address_for_pk
from .apps import APP_LABEL
class RequestStatus(enum.Enum):
"""Request statuses.
Values are used in the ui.
"""
PENDING = "Pending"
WAITING_FOR_FEEDBACK = "Waiting for feedback"
FEEDBACK_TO_HANDLE = "Feedback to handle"
ACCEPTED = "Accepted"
SCHEDULED = "Scheduled"
FIRST_LISTING_DONE = "First listing done"
FIRST_ORIGIN_LOADED = "First origin loaded"
REJECTED = "Rejected"
SUSPENDED = "Suspended"
UNSUCCESSFUL = "Unsuccessful"
@classmethod
def choices(cls):
return tuple((variant.name, variant.value) for variant in cls)
def allowed_next_statuses(self) -> List[RequestStatus]:
next_statuses = {
self.PENDING: [self.WAITING_FOR_FEEDBACK, self.REJECTED, self.SUSPENDED],
self.WAITING_FOR_FEEDBACK: [self.FEEDBACK_TO_HANDLE],
self.FEEDBACK_TO_HANDLE: [
self.WAITING_FOR_FEEDBACK,
self.ACCEPTED,
self.REJECTED,
self.SUSPENDED,
self.UNSUCCESSFUL,
],
self.ACCEPTED: [self.SCHEDULED],
self.SCHEDULED: [
self.FIRST_LISTING_DONE,
# in case of race condition between lister and loader:
self.FIRST_ORIGIN_LOADED,
],
self.FIRST_LISTING_DONE: [self.FIRST_ORIGIN_LOADED],
self.FIRST_ORIGIN_LOADED: [],
self.REJECTED: [],
self.SUSPENDED: [self.PENDING],
self.UNSUCCESSFUL: [],
}
return next_statuses[self] # type: ignore
class RequestActorRole(enum.Enum):
MODERATOR = "moderator"
SUBMITTER = "submitter"
FORGE_ADMIN = "forge admin"
EMAIL = "email"
@classmethod
def choices(cls):
return tuple((variant.name, variant.value) for variant in cls)
class RequestHistory(models.Model):
"""Comment or status change. This is commented or changed by either submitter or
moderator.
"""
request = models.ForeignKey("Request", models.DO_NOTHING)
text = models.TextField()
actor = models.TextField()
actor_role = models.TextField(choices=RequestActorRole.choices())
date = models.DateTimeField(auto_now_add=True)
new_status = models.TextField(choices=RequestStatus.choices(), null=True)
message_source = models.BinaryField(null=True)
class Meta:
app_label = APP_LABEL
db_table = "add_forge_request_history"
class Request(models.Model):
status = models.TextField(
choices=RequestStatus.choices(),
default=RequestStatus.PENDING.name,
)
submission_date = models.DateTimeField(auto_now_add=True)
submitter_name = models.TextField()
submitter_email = models.TextField()
submitter_forward_username = models.BooleanField(default=False)
# FIXME: shall we do create a user model inside the webapp instead?
forge_type = models.TextField()
- forge_url = models.TextField()
+ forge_url = models.URLField()
forge_contact_email = models.EmailField()
forge_contact_name = models.TextField()
forge_contact_comment = models.TextField(
null=True,
help_text="Where did you find this contact information (url, ...)",
)
last_moderator = models.TextField(default="None")
last_modified_date = models.DateTimeField(null=True)
class Meta:
app_label = APP_LABEL
db_table = "add_forge_request"
@property
def inbound_email_address(self) -> str:
"""Generate an email address for correspondence related to this request."""
base_address = get_config()["add_forge_now"]["email_address"]
return get_address_for_pk(salt=APP_LABEL, base_address=base_address, pk=self.pk)
@property
def forge_domain(self) -> str:
"""Get the domain/netloc out of the forge_url.
Fallback to using the first part of the url path, if the netloc can't be found
(for instance, if the url scheme hasn't been set).
"""
parsed_url = urlparse(self.forge_url)
domain = parsed_url.netloc
if not domain:
domain = parsed_url.path.split("/", 1)[0]
return domain
diff --git a/swh/web/templates/add_forge_now/creation_form.html b/swh/web/templates/add_forge_now/creation_form.html
index e6542ac5..6be8f222 100644
--- a/swh/web/templates/add_forge_now/creation_form.html
+++ b/swh/web/templates/add_forge_now/creation_form.html
@@ -1,129 +1,129 @@
{% extends "./common.html" %}
{% comment %}
Copyright (C) 2022 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% block tab_content %}
{% if not user.is_authenticated %}
You must be logged in to submit an add forge request. Please
log in
{% else %}
Once an add-forge-request is submitted, its status can be viewed in
the
submitted requests list. This process involves a moderator approval and
might take a few days to handle (it primarily depends on the response
time from the forge).
{% endif %}
{% endblock %}
diff --git a/swh/web/tests/add_forge_now/test_migration.py b/swh/web/tests/add_forge_now/test_migration.py
index df692242..30dfd01a 100644
--- a/swh/web/tests/add_forge_now/test_migration.py
+++ b/swh/web/tests/add_forge_now/test_migration.py
@@ -1,213 +1,252 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import pytest
from django.core.exceptions import ValidationError
from swh.web.add_forge_now.apps import APP_LABEL
MIGRATION_0001 = "0001_initial"
MIGRATION_0002 = "0002_authorized_null_comment"
MIGRATION_0003 = "0003_request_submitter_forward_username"
MIGRATION_0005 = "0005_prepare_inbound_email"
MIGRATION_0006 = "0006_request_add_new_fields"
MIGRATION_0007 = "0007_rename_denied_request_status"
+MIGRATION_0008 = "0008_turn_request_forge_url_into_url_field"
def now() -> datetime:
return datetime.now(tz=timezone.utc)
def test_add_forge_now_initial_migration(migrator):
"""Basic migration test to check the model is fine"""
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0001))
request = state.apps.get_model(APP_LABEL, "Request")
request_history = state.apps.get_model(APP_LABEL, "RequestHistory")
from swh.web.add_forge_now.models import RequestActorRole, RequestStatus
date_now = now()
req = request(
status=RequestStatus.PENDING,
submitter_name="dudess",
submitter_email="dudess@orga.org",
forge_type="cgit",
forge_url="https://example.org/forge",
forge_contact_email="forge@//example.org",
forge_contact_name="forge",
forge_contact_comment=(
"Discovered on the main forge homepag, following contact link."
),
)
req.save()
assert req.submission_date > date_now
req_history = request_history(
request=req,
text="some comment from the moderator",
actor="moderator",
actor_role=RequestActorRole.MODERATOR,
new_status=None,
)
req_history.save()
assert req_history.date > req.submission_date
req_history2 = request_history(
request=req,
text="some answer from the user",
actor="user",
actor_role=RequestActorRole.SUBMITTER,
new_status=None,
)
req_history2.save()
assert req_history2.date > req_history.date
def test_add_forge_now_allow_no_comment(migrator):
"""Basic migration test to check new model authorized empty comment"""
from django.db.utils import IntegrityError
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0001))
def make_request_with_empty_comment(requestModel):
return requestModel(
status="PENDING",
submitter_name="dudess",
submitter_email="dudess@orga.org",
forge_type="cgit",
forge_url="https://example.org/forge",
forge_contact_email="forge@//example.org",
forge_contact_name="forge",
forge_contact_comment=None,
)
requestModel = state.apps.get_model(APP_LABEL, "Request")
req = make_request_with_empty_comment(requestModel)
with pytest.raises(IntegrityError, match="violates not-null constraint"):
req.save()
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0002))
requestModel2 = state.apps.get_model(APP_LABEL, "Request")
req2 = make_request_with_empty_comment(requestModel2)
req2.save()
def test_add_forge_now_store_submitter_forward_username(migrator):
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0002))
requestModel = state.apps.get_model(APP_LABEL, "Request")
assert not hasattr(requestModel, "submitter_forward_username")
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0003))
requestModel2 = state.apps.get_model(APP_LABEL, "Request")
assert hasattr(requestModel2, "submitter_forward_username")
def test_add_forge_now_add_new_fields_to_request(migrator):
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0005))
Request = state.apps.get_model(APP_LABEL, "Request")
RequestHistory = state.apps.get_model(APP_LABEL, "RequestHistory")
assert not hasattr(Request, "last_moderator")
assert not hasattr(Request, "last_modified_date")
from swh.web.add_forge_now.models import RequestActorRole, RequestStatus
req = Request(
status=RequestStatus.PENDING,
submitter_name="dudess",
submitter_email="dudess@orga.org",
forge_type="cgit",
forge_url="https://example.org/forge",
forge_contact_email="forge@//example.org",
forge_contact_name="forge",
forge_contact_comment=(
"Discovered on the main forge homepag, following contact link."
),
)
req.save()
req_history = RequestHistory(
request=req,
text="some comment from the submitter",
actor="submitter",
actor_role=RequestActorRole.SUBMITTER.name,
new_status=None,
)
req_history.save()
req_history = RequestHistory(
request=req,
text="some comment from the moderator",
actor="moderator",
actor_role=RequestActorRole.MODERATOR.name,
new_status=None,
)
req_history.save()
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0006))
Request = state.apps.get_model(APP_LABEL, "Request")
RequestHistory = state.apps.get_model(APP_LABEL, "RequestHistory")
assert hasattr(Request, "last_moderator")
assert hasattr(Request, "last_modified_date")
for request in Request.objects.all():
history = RequestHistory.objects.filter(request=request)
history = history.order_by("id")
assert request.last_modified_date == history.last().date
assert request.last_moderator == history.last().actor
def test_add_forge_now_denied_status_renamed_to_unsuccesful(migrator):
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0006))
Request = state.apps.get_model(APP_LABEL, "Request")
from swh.web.add_forge_now.models import RequestStatus
req = Request(
status=RequestStatus.UNSUCCESSFUL.name,
submitter_name="dudess",
submitter_email="dudess@orga.org",
forge_type="cgit",
forge_url="https://example.org/forge",
forge_contact_email="forge@example.org",
forge_contact_name="forge",
forge_contact_comment=(
"Discovered on the main forge homepag, following contact link."
),
last_modified_date=datetime.now(timezone.utc),
)
with pytest.raises(ValidationError):
req.clean_fields()
state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0007))
Request = state.apps.get_model(APP_LABEL, "Request")
req = Request(
status=RequestStatus.UNSUCCESSFUL.name,
submitter_name="dudess",
submitter_email="dudess@orga.org",
forge_type="cgit",
forge_url="https://example.org/forge",
forge_contact_email="forge@example.org",
forge_contact_name="forge",
forge_contact_comment=(
"Discovered on the main forge homepag, following contact link."
),
last_modified_date=datetime.now(timezone.utc),
)
req.clean_fields()
+
+
+def test_add_forge_now_url_validation(migrator):
+
+ state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0007))
+ Request = state.apps.get_model(APP_LABEL, "Request")
+
+ from swh.web.add_forge_now.models import RequestStatus
+
+ request = Request(
+ status=RequestStatus.PENDING.name,
+ submitter_name="dudess",
+ submitter_email="dudess@orga.org",
+ forge_type="cgit",
+ forge_url="foo",
+ forge_contact_email="forge@example.org",
+ forge_contact_name="forge",
+ forge_contact_comment="bar",
+ last_modified_date=datetime.now(timezone.utc),
+ )
+ request.clean_fields()
+
+ state = migrator.apply_tested_migration((APP_LABEL, MIGRATION_0008))
+ Request = state.apps.get_model(APP_LABEL, "Request")
+
+ request = Request(
+ status=RequestStatus.PENDING.name,
+ submitter_name="johndoe",
+ submitter_email="johndoe@example.org",
+ forge_type="cgit",
+ forge_url="foobar",
+ forge_contact_email="forge@example.org",
+ forge_contact_name="forge",
+ forge_contact_comment="bar",
+ last_modified_date=datetime.now(timezone.utc),
+ )
+ with pytest.raises(ValidationError, match="Enter a valid URL."):
+ request.clean_fields()
diff --git a/swh/web/tests/api/views/test_add_forge_now.py b/swh/web/tests/api/views/test_add_forge_now.py
index adeb0ad1..9f938111 100644
--- a/swh/web/tests/api/views/test_add_forge_now.py
+++ b/swh/web/tests/api/views/test_add_forge_now.py
@@ -1,629 +1,651 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import copy
import datetime
import threading
import time
from typing import Dict
from urllib.parse import urlencode, urlparse
import iso8601
import pytest
from swh.web.add_forge_now.models import Request, RequestHistory
from swh.web.common.utils import reverse
from swh.web.config import get_config
from swh.web.inbound_email.utils import get_address_for_pk
from swh.web.tests.utils import (
check_api_get_responses,
check_api_post_response,
check_http_get_response,
check_http_post_response,
)
@pytest.mark.django_db
def test_add_forge_request_create_anonymous_user(api_client):
url = reverse("api-1-add-forge-request-create")
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db
def test_add_forge_request_create_empty(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
resp = check_api_post_response(api_client, url, status_code=400)
assert '"forge_type"' in resp.data["reason"]
ADD_FORGE_DATA_FORGE1: Dict = {
"forge_type": "gitlab",
"forge_url": "https://gitlab.example.org",
"forge_contact_email": "admin@gitlab.example.org",
"forge_contact_name": "gitlab.example.org admin",
"forge_contact_comment": "user marked as owner in forge members",
"submitter_forward_username": True,
}
ADD_FORGE_DATA_FORGE2: Dict = {
"forge_type": "gitea",
"forge_url": "https://gitea.example.org",
"forge_contact_email": "admin@gitea.example.org",
"forge_contact_name": "gitea.example.org admin",
"forge_contact_comment": "user marked as owner in forge members",
"submitter_forward_username": True,
}
ADD_FORGE_DATA_FORGE3: Dict = {
"forge_type": "heptapod",
"forge_url": "https://heptapod.host/",
"forge_contact_email": "admin@example.org",
"forge_contact_name": "heptapod admin",
"forge_contact_comment": "", # authorized empty or null comment
"submitter_forward_username": False,
}
ADD_FORGE_DATA_FORGE4: Dict = {
**ADD_FORGE_DATA_FORGE3,
"forge_url": "https://heptapod2.host/",
"submitter_forward_username": "on",
}
ADD_FORGE_DATA_FORGE5: Dict = {
**ADD_FORGE_DATA_FORGE3,
"forge_url": "https://heptapod3.host/",
"submitter_forward_username": "off",
}
def inbound_email_for_pk(pk: int) -> str:
"""Check that the inbound email matches the one expected for the given pk"""
base_address = get_config()["add_forge_now"]["email_address"]
return get_address_for_pk(
salt="swh_web_add_forge_now", base_address=base_address, pk=pk
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
@pytest.mark.parametrize(
"add_forge_data",
[
ADD_FORGE_DATA_FORGE1,
ADD_FORGE_DATA_FORGE2,
ADD_FORGE_DATA_FORGE3,
ADD_FORGE_DATA_FORGE4,
],
)
def test_add_forge_request_create_success_post(
api_client, regular_user, add_forge_data
):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
date_before = datetime.datetime.now(tz=datetime.timezone.utc)
resp = check_api_post_response(
api_client,
url,
data=add_forge_data,
status_code=201,
)
date_after = datetime.datetime.now(tz=datetime.timezone.utc)
consent = add_forge_data["submitter_forward_username"]
# map the expected result with what's expectedly read from the db to ease comparison
expected_consent_bool = consent == "on" if isinstance(consent, str) else consent
assert resp.data == {
**add_forge_data,
"id": resp.data["id"],
"status": "PENDING",
"submission_date": resp.data["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"submitter_forward_username": expected_consent_bool,
"last_moderator": resp.data["last_moderator"],
"last_modified_date": resp.data["last_modified_date"],
"inbound_email_address": inbound_email_for_pk(resp.data["id"]),
"forge_domain": urlparse(add_forge_data["forge_url"]).netloc,
}
assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after
request = Request.objects.all().last()
assert request.forge_url == add_forge_data["forge_url"]
assert request.submitter_name == regular_user.username
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_create_success_form_encoded(client, regular_user):
client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
date_before = datetime.datetime.now(tz=datetime.timezone.utc)
resp = check_http_post_response(
client,
url,
request_content_type="application/x-www-form-urlencoded",
data=urlencode(ADD_FORGE_DATA_FORGE1),
status_code=201,
)
date_after = datetime.datetime.now(tz=datetime.timezone.utc)
assert resp.data == {
**ADD_FORGE_DATA_FORGE1,
"id": resp.data["id"],
"status": "PENDING",
"submission_date": resp.data["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data["last_moderator"],
"last_modified_date": resp.data["last_modified_date"],
"inbound_email_address": inbound_email_for_pk(1),
"forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
}
assert date_before < iso8601.parse_date(resp.data["submission_date"]) < date_after
request = Request.objects.all()[0]
assert request.forge_url == ADD_FORGE_DATA_FORGE1["forge_url"]
assert request.submitter_name == regular_user.username
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_create_duplicate(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
check_api_post_response(
api_client,
url,
data=ADD_FORGE_DATA_FORGE1,
status_code=201,
)
check_api_post_response(
api_client,
url,
data=ADD_FORGE_DATA_FORGE1,
status_code=409,
)
requests = Request.objects.all()
assert len(requests) == 1
+@pytest.mark.django_db(transaction=True, reset_sequences=True)
+def test_add_forge_request_create_invalid_forge_url(api_client, regular_user):
+ api_client.force_login(regular_user)
+ url = reverse("api-1-add-forge-request-create")
+
+ forge_data = copy.deepcopy(ADD_FORGE_DATA_FORGE1)
+ forge_data["forge_url"] = "foo"
+
+ resp = check_api_post_response(
+ api_client,
+ url,
+ data=forge_data,
+ status_code=400,
+ )
+
+ assert resp.data == {
+ "exception": "BadInputExc",
+ "reason": '{"forge_url": ["Enter a valid URL."]}',
+ }
+
+
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_anonymous_user(api_client):
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_regular_user(api_client, regular_user):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=403)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_non_existent(api_client, add_forge_moderator):
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=400)
def create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE1):
api_client.force_login(regular_user)
url = reverse("api-1-add-forge-request-create")
return check_api_post_response(
api_client,
url,
data=data,
status_code=201,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_empty(api_client, regular_user, add_forge_moderator):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, status_code=400)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_missing_field(
api_client, regular_user, add_forge_moderator
):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(api_client, url, data={}, status_code=400)
check_api_post_response(
api_client, url, data={"new_status": "REJECTED"}, status_code=400
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update(api_client, regular_user, add_forge_moderator):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(
api_client, url, data={"text": "updating request"}, status_code=200
)
check_api_post_response(
api_client,
url,
data={"new_status": "REJECTED", "text": "request rejected"},
status_code=200,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_invalid_new_status(
api_client, regular_user, add_forge_moderator
):
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
check_api_post_response(
api_client,
url,
data={"new_status": "ACCEPTED", "text": "request accepted"},
status_code=400,
)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_update_status_concurrent(
api_client, regular_user, add_forge_moderator, mocker
):
_block_while_testing = mocker.patch(
"swh.web.api.views.add_forge_now._block_while_testing"
)
_block_while_testing.side_effect = lambda: time.sleep(1)
create_add_forge_request(api_client, regular_user)
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
worker_ended = False
def worker():
nonlocal worker_ended
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
worker_ended = True
# this thread will first modify the request status to WAITING_FOR_FEEDBACK
thread = threading.Thread(target=worker)
thread.start()
# the other thread (slower) will attempt to modify the request status to REJECTED
# but it will not be allowed as the first faster thread already modified it
# and REJECTED state can not be reached from WAITING_FOR_FEEDBACK one
time.sleep(0.5)
check_api_post_response(
api_client,
url,
data={"new_status": "REJECTED", "text": "request accepted"},
status_code=400,
)
thread.join()
assert worker_ended
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_anonymous(api_client, regular_user):
url = reverse("api-1-add-forge-request-list")
resp = check_api_get_responses(api_client, url, status_code=200)
assert resp.data == []
create_add_forge_request(api_client, regular_user)
resp = check_api_get_responses(api_client, url, status_code=200)
add_forge_request = {
"forge_url": ADD_FORGE_DATA_FORGE1["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE1["forge_type"],
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"id": resp.data[0]["id"],
}
assert resp.data == [add_forge_request]
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
resp = check_api_get_responses(api_client, url, status_code=200)
other_forge_request = {
"forge_url": ADD_FORGE_DATA_FORGE2["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE2["forge_type"],
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"id": resp.data[0]["id"],
}
assert resp.data == [other_forge_request, add_forge_request]
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_moderator(
api_client, regular_user, add_forge_moderator
):
url = reverse("api-1-add-forge-request-list")
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
api_client.force_login(add_forge_moderator)
resp = check_api_get_responses(api_client, url, status_code=200)
add_forge_request = {
**ADD_FORGE_DATA_FORGE1,
"status": "PENDING",
"submission_date": resp.data[1]["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data[1]["last_moderator"],
"last_modified_date": resp.data[1]["last_modified_date"],
"id": resp.data[1]["id"],
"inbound_email_address": inbound_email_for_pk(resp.data[1]["id"]),
"forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
}
other_forge_request = {
**ADD_FORGE_DATA_FORGE2,
"status": "PENDING",
"submission_date": resp.data[0]["submission_date"],
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": resp.data[0]["last_moderator"],
"last_modified_date": resp.data[0]["last_modified_date"],
"id": resp.data[0]["id"],
"inbound_email_address": inbound_email_for_pk(resp.data[0]["id"]),
"forge_domain": urlparse(ADD_FORGE_DATA_FORGE2["forge_url"]).netloc,
}
assert resp.data == [other_forge_request, add_forge_request]
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_pagination(
api_client, regular_user, api_request_factory
):
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user, data=ADD_FORGE_DATA_FORGE2)
url = reverse("api-1-add-forge-request-list", query_params={"per_page": 1})
resp = check_api_get_responses(api_client, url, 200)
assert len(resp.data) == 1
request = api_request_factory.get(url)
next_url = reverse(
"api-1-add-forge-request-list",
query_params={"page": 2, "per_page": 1},
request=request,
)
assert resp["Link"] == f'<{next_url}>; rel="next"'
resp = check_api_get_responses(api_client, next_url, 200)
assert len(resp.data) == 1
prev_url = reverse(
"api-1-add-forge-request-list",
query_params={"page": 1, "per_page": 1},
request=request,
)
assert resp["Link"] == f'<{prev_url}>; rel="previous"'
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_list_submitter_filtering(
api_client, regular_user, regular_user2
):
create_add_forge_request(api_client, regular_user)
create_add_forge_request(api_client, regular_user2, data=ADD_FORGE_DATA_FORGE2)
api_client.force_login(regular_user)
url = reverse(
"api-1-add-forge-request-list", query_params={"user_requests_only": 1}
)
resp = check_api_get_responses(api_client, url, status_code=200)
assert len(resp.data) == 1
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get(api_client, regular_user, add_forge_moderator):
resp = create_add_forge_request(api_client, regular_user)
submission_date = resp.data["submission_date"]
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
api_client.force_login(add_forge_moderator)
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
api_client.logout()
url = reverse("api-1-add-forge-request-get", url_args={"id": 1})
resp = check_api_get_responses(api_client, url, status_code=200)
assert resp.data == {
"request": {
"forge_url": ADD_FORGE_DATA_FORGE1["forge_url"],
"forge_type": ADD_FORGE_DATA_FORGE1["forge_type"],
"id": 1,
"status": "WAITING_FOR_FEEDBACK",
"submission_date": submission_date,
},
"history": [
{
"id": 1,
"actor_role": "SUBMITTER",
"date": resp.data["history"][0]["date"],
"new_status": "PENDING",
},
{
"id": 2,
"actor_role": "MODERATOR",
"date": resp.data["history"][1]["date"],
"new_status": "WAITING_FOR_FEEDBACK",
},
],
}
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get_moderator(api_client, regular_user, add_forge_moderator):
resp = create_add_forge_request(api_client, regular_user)
submission_date = resp.data["submission_date"]
url = reverse("api-1-add-forge-request-update", url_args={"id": 1})
api_client.force_login(add_forge_moderator)
check_api_post_response(
api_client,
url,
data={"new_status": "WAITING_FOR_FEEDBACK", "text": "waiting for message"},
status_code=200,
)
url = reverse("api-1-add-forge-request-get", url_args={"id": 1})
resp = check_api_get_responses(api_client, url, status_code=200)
resp.data["history"] = [dict(history_item) for history_item in resp.data["history"]]
assert resp.data == {
"request": {
**ADD_FORGE_DATA_FORGE1,
"id": 1,
"status": "WAITING_FOR_FEEDBACK",
"submission_date": submission_date,
"submitter_name": regular_user.username,
"submitter_email": regular_user.email,
"last_moderator": add_forge_moderator.username,
"last_modified_date": resp.data["history"][1]["date"],
"inbound_email_address": inbound_email_for_pk(1),
"forge_domain": urlparse(ADD_FORGE_DATA_FORGE1["forge_url"]).netloc,
},
"history": [
{
"id": 1,
"text": "",
"actor": regular_user.username,
"actor_role": "SUBMITTER",
"date": resp.data["history"][0]["date"],
"new_status": "PENDING",
"message_source_url": None,
},
{
"id": 2,
"text": "waiting for message",
"actor": add_forge_moderator.username,
"actor_role": "MODERATOR",
"date": resp.data["history"][1]["date"],
"new_status": "WAITING_FOR_FEEDBACK",
"message_source_url": None,
},
],
}
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get_moderator_message_source(
api_client, regular_user, add_forge_moderator
):
resp = create_add_forge_request(api_client, regular_user)
rh = RequestHistory(
request=Request.objects.get(pk=resp.data["id"]),
new_status="WAITING_FOR_FEEDBACK",
text="waiting for message",
actor=add_forge_moderator.username,
actor_role="MODERATOR",
message_source=b"test with a message source",
)
rh.save()
api_client.force_login(add_forge_moderator)
url = reverse("api-1-add-forge-request-get", url_args={"id": resp.data["id"]})
resp = check_api_get_responses(api_client, url, status_code=200)
resp.data["history"] = [dict(history_item) for history_item in resp.data["history"]]
# Check that the authentified moderator can't urlhack non-existent message sources
assert resp.data["history"][0]["message_source_url"] is None
empty_message_url = reverse(
"forge-add-message-source", url_args={"id": resp.data["history"][0]["id"]}
)
check_http_get_response(api_client, empty_message_url, status_code=404)
# Check that the authentified moderator can't urlhack non-existent message sources
non_existent_message_url = reverse(
"forge-add-message-source", url_args={"id": 9001}
)
check_http_get_response(api_client, non_existent_message_url, status_code=404)
# Check that the authentified moderator can access the message source when the url is
# given
message_source_url = resp.data["history"][-1]["message_source_url"]
assert message_source_url is not None
message_source_resp = check_http_get_response(
api_client, message_source_url, status_code=200, content_type="text/email"
)
# Check that the message source shows up as an attachment
assert message_source_resp.content == rh.message_source
disposition = message_source_resp["Content-Disposition"]
assert disposition.startswith("attachment; filename=")
assert disposition.endswith('.eml"')
# Check that a regular user can't access message sources
api_client.force_login(regular_user)
check_http_get_response(api_client, message_source_url, status_code=302)
api_client.force_login(add_forge_moderator)
@pytest.mark.django_db(transaction=True, reset_sequences=True)
def test_add_forge_request_get_invalid(api_client):
url = reverse("api-1-add-forge-request-get", url_args={"id": 3})
check_api_get_responses(api_client, url, status_code=400)