diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py
index 81c78b3d..6ab0e05a 100644
--- a/swh/deposit/tests/api/test_deposit.py
+++ b/swh/deposit/tests/api/test_deposit.py
@@ -1,163 +1,168 @@
-# Copyright (C) 2017 The Software Heritage developers
+# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
from django.core.urlresolvers import reverse
from io import BytesIO
from nose.tools import istest, nottest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED
from swh.deposit.config import DEPOSIT_STATUS_PARTIAL
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE
from swh.deposit.models import Deposit, DepositClient, DepositCollection
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositNoAuthCase(APITestCase, BasicTestCase):
"""Deposit access are protected with basic authentication.
"""
@istest
def post_will_fail_with_401(self):
"""Without authentication, endpoint refuses access with 401 response
"""
url = reverse(COL_IRI, args=[self.collection.name])
# when
response = self.client.post(url)
# then
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Deposit access are protected with basic authentication.
"""
def setUp(self):
super().setUp()
# Add another user
_collection2 = DepositCollection(name='some')
_collection2.save()
_user = DepositClient.objects.create_user(username='user',
password='user')
_user.collections = [_collection2.id]
self.collection2 = _collection2
@istest
def access_to_another_user_collection_is_forbidden(self):
"""Access to another user collection should return a 403
"""
url = reverse(COL_IRI, args=[self.collection2.name])
response = self.client.post(url)
self.assertEqual(response.status_code,
status.HTTP_403_FORBIDDEN)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Client hal cannot access collection %s' % (
+ self.collection2.name, ))
@istest
def delete_on_col_iri_not_supported(self):
"""Delete on col iri should return a 405 response
"""
url = reverse(COL_IRI, args=[self.collection.name])
response = self.client.delete(url)
self.assertEqual(response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'DELETE method is not supported on this endpoint')
@nottest
def create_deposit_with_rejection_status(self):
url = reverse(COL_IRI, args=[self.collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
self.assertEquals(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
actual_state = response_content[
'{http://www.w3.org/2005/Atom}deposit_status']
self.assertEquals(actual_state, DEPOSIT_STATUS_REJECTED)
@istest
def act_on_deposit_rejected_is_not_permitted(self):
deposit_id = self.create_deposit_with_status(DEPOSIT_STATUS_REJECTED)
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_REJECTED
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_SLUG='external-id')
self.assertEquals(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(
response.content.decode('utf-8'),
"You can only act on deposit with status '%s'" % (
DEPOSIT_STATUS_PARTIAL, ))
@istest
def add_deposit_with_parent(self):
# given multiple deposit already loaded
deposit_id = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id')
deposit1 = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit1)
self.assertEquals(deposit1.external_id, 'some-external-id')
self.assertEquals(deposit1.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id2 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id')
deposit2 = Deposit.objects.get(pk=deposit_id2)
self.assertIsNotNone(deposit2)
self.assertEquals(deposit2.external_id, 'some-external-id')
self.assertEquals(deposit2.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id3 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_FAILURE,
external_id='some-external-id')
deposit3 = Deposit.objects.get(pk=deposit_id3)
self.assertIsNotNone(deposit3)
self.assertEquals(deposit3.external_id, 'some-external-id')
self.assertEquals(deposit3.status, DEPOSIT_STATUS_LOAD_FAILURE)
# when
deposit_id3 = self.create_simple_deposit_partial(
external_id='some-external-id')
# then
deposit4 = Deposit.objects.get(pk=deposit_id3)
self.assertIsNotNone(deposit4)
self.assertEquals(deposit4.external_id, 'some-external-id')
self.assertEquals(deposit4.status, DEPOSIT_STATUS_PARTIAL)
self.assertEquals(deposit4.parent, deposit2)
diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py
index cee1c2b6..c8f1408a 100644
--- a/swh/deposit/tests/api/test_deposit_multipart.py
+++ b/swh/deposit/tests/api/test_deposit_multipart.py
@@ -1,393 +1,402 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.urlresolvers import reverse
from io import BytesIO
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI
from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase
from ..common import FileSystemCreationRoutine
class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
FileSystemCreationRoutine):
"""Post multipart deposit scenario
"""
def setUp(self):
super().setUp()
self.data_atom_entry_ok = b"""
Title
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
2005-10-07T17:17:08Z
Contributor
The abstract
The abstract
Access Rights
Alternative Title
Date Available
Bibliographic Citation # noqa
Contributor
Description
Has Part
Has Version
Identifier
Is Part Of
Publisher
References
Rights Holder
Source
Title
Type
"""
self.data_atom_entry_update_in_place = """
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b
Title
Type
"""
@istest
def post_deposit_multipart_without_slug_header_is_bad_request(self):
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
@istest
def post_deposit_multipart_zip(self):
"""one multipart deposit (zip+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/zip',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
@istest
def post_deposit_multipart_tar(self):
"""one multipart deposit (tar+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/x-tar',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
@istest
def post_deposit_multipart_put_to_replace_metadata(self):
"""One multipart deposit followed by a metadata update should be
accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/zip',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='true',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
replace_metadata_uri = response._headers['location'][1]
response = self.client.put(
replace_metadata_uri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_update_in_place,
HTTP_IN_PROGRESS='false')
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# deposit_id did not change
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b')
# FAILURE scenarios
@istest
def post_deposit_multipart_only_archive_and_atom_entry(self):
"""Multipart deposit only accepts one archive and one atom+xml"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/x-tar',
size=len(archive_content),
charset=None)
other_archive_content = b"some-other-content"
other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
field_name='atom0',
name='atom0',
content_type='application/x-tar',
size=len(other_archive_content),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': other_archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
+ self.assertTrue(
+ 'Only 1 application/zip (or application/x-tar) archive' in \
+ response.content.decode('utf-8'))
+
# when
archive.seek(0)
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
+ self.assertTrue(
+ 'You must provide both 1 application/zip (or '
+ 'application/x-tar) and 1 atom+xml entry for '
+ 'multipart deposit' in response.content.decode('utf-8')
+ )
diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py
index cf911e7b..7966586e 100644
--- a/swh/deposit/tests/api/test_deposit_update.py
+++ b/swh/deposit/tests/api/test_deposit_update.py
@@ -1,342 +1,347 @@
-# Copyright (C) 2017 The Software Heritage developers
+# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.config import EDIT_SE_IRI, EM_IRI
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine, create_arborescence_zip
class DepositUpdateOrReplaceExistingDataTest(
APITestCase, WithAuthTestCase, BasicTestCase,
FileSystemCreationRoutine, CommonCreationRoutine):
"""Try put/post (update/replace) query on EM_IRI
"""
def setUp(self):
super().setUp()
self.atom_entry_data1 = b"""
bar
"""
self.atom_entry_data1 = b"""
bar
"""
self.archive2 = create_arborescence_zip(
self.root_path, 'archive2', 'file2', b'some other content in file')
@istest
def replace_archive_to_deposit_is_possible(self):
"""Replace all archive with another one should return a 204 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
assert self.archive['name'] in requests[0].archive.name
# we have no metadata for that deposit
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 0
deposit_id = self._update_deposit_with_status(deposit_id,
status_partial=True)
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 1
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
external_id = 'some-external-id-1'
response = self.client.put(
update_uri,
content_type='application/zip', # as zip
data=self.archive2['data'],
# + headers
CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive2['name'], ))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
self.assertEquals(len(list(requests)), 1)
self.assertRegex(requests[0].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
self.assertEquals(len(requests), 1)
@istest
def replace_metadata_to_deposit_is_possible(self):
"""Replace all metadata with another one should return a 204 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
assert len(list(requests)) == 0
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
assert len(requests) == 1
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
self.assertEquals(len(list(requests)), 1)
metadata = requests[0].metadata
self.assertEquals(metadata["{http://www.w3.org/2005/Atom}foobar"],
'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
self.assertEquals(len(requests), 1)
@istest
def add_archive_to_deposit_is_possible(self):
"""Add another archive to a deposit return a 201 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
assert self.archive['name'] in requests[0].archive.name
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 0
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
external_id = 'some-external-id-1'
response = self.client.post(
update_uri,
content_type='application/zip', # as zip
data=self.archive2['data'],
# + headers
CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive2['name'],))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = list(DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive']).order_by('id'))
self.assertEquals(len(requests), 2)
# first archive still exists
self.assertRegex(requests[0].archive.name, self.archive['name'])
# a new one was added
self.assertRegex(requests[1].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
self.assertEquals(len(requests), 0)
@istest
def add_metadata_to_deposit_is_possible(self):
"""Add metadata with another one should return a 204 response
"""
# given
deposit_id = self.create_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
assert len(list(requests)) == 2
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
assert len(requests) == 0
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata']).order_by('id')
self.assertEquals(len(list(requests)), 3)
# a new one was added
self.assertEquals(requests[1].metadata[
"{http://www.w3.org/2005/Atom}foobar"], 'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
self.assertEquals(len(requests), 0)
class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Failure scenario about add/replace (post/put) query on deposit.
"""
@istest
def add_metadata_to_unknown_collection(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
- url = reverse(EDIT_SE_IRI,
- args=['unknown', 999]),
+ url = reverse(EDIT_SE_IRI, args=['test', 1000])
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Unknown collection name test')
@istest
def add_metadata_to_unknown_deposit(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
- url = reverse(EDIT_SE_IRI,
- args=[self.collection.name, 999]),
+ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 999])
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Deposit with id 999 does not exist')
@istest
def replace_metadata_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
- url = reverse(EDIT_SE_IRI,
- args=[self.collection.name, 999]),
+ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 998])
response = self.client.put(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Deposit with id 998 does not exist')
@istest
def add_archive_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
- url = reverse(EM_IRI,
- args=[self.collection.name, 999]),
+ url = reverse(EM_IRI, args=[self.collection.name, 997])
response = self.client.post(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Deposit with id 997 does not exist')
@istest
def replace_archive_to_unknown_deposit(self):
"""Replacing archive to unknown deposit should return a 404 response
"""
- url = reverse(EM_IRI,
- args=[self.collection.name, 999]),
+ url = reverse(EM_IRI, args=[self.collection.name, 996])
response = self.client.put(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ self.assertRegex(response.content.decode('utf-8'),
+ 'Deposit with id 996 does not exist')
@istest
def post_metadata_to_em_iri_failure(self):
"""Update (POST) archive with wrong content type should return 400
"""
deposit_id = self.create_deposit_partial() # only update on partial
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.post(
update_uri,
content_type='application/x-gtar-compressed',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(response.content.decode('utf-8'),
'Packaging format supported is restricted to '
'application/zip, application/x-tar')
@istest
def put_metadata_to_em_iri_failure(self):
"""Update (PUT) archive with wrong content type should return 400
"""
# given
deposit_id = self.create_deposit_partial() # only update on partial
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
# then
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(response.content.decode('utf-8'),
'Packaging format supported is restricted to '
'application/zip, application/x-tar')