diff --git a/swh/deposit/config.py b/swh/deposit/config.py
index 5235d82b..4f6758de 100644
--- a/swh/deposit/config.py
+++ b/swh/deposit/config.py
@@ -1,73 +1,76 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import logging
from swh.core.config import SWHConfig
# IRIs (Internationalized Resource identifier) sword 2.0 specified
EDIT_SE_IRI = 'edit_se_iri'
EM_IRI = 'em_iri'
CONT_FILE_IRI = 'cont_file_iri'
SD_IRI = 'servicedocument'
COL_IRI = 'upload'
STATE_IRI = 'state_iri'
PRIVATE_GET_RAW_CONTENT = 'private-download'
PRIVATE_PUT_DEPOSIT = 'private-update'
PRIVATE_GET_DEPOSIT_METADATA = 'private-read'
ARCHIVE_KEY = 'archive'
METADATA_KEY = 'metadata'
+ARCHIVE_TYPE = 'archive'
+METADATA_TYPE = 'metadata'
+
AUTHORIZED_PLATFORMS = ['development', 'production', 'testing']
DEPOSIT_STATUS_REJECTED = 'rejected'
DEPOSIT_STATUS_PARTIAL = 'partial'
DEPOSIT_STATUS_READY = 'ready'
DEPOSIT_STATUS_READY_FOR_CHECKS = 'ready-for-checks'
def setup_django_for(platform):
"""Setup function for command line tools (swh.deposit.create_user,
swh.deposit.scheduler.cli) to initialize the needed db access.
Note:
Do not import any django related module prior to this function
call. Otherwise, this will raise an
django.core.exceptions.ImproperlyConfigured error message.
Args:
platform (str): the platform the scheduling is running
Raises:
ValueError in case of wrong platform inputs.
"""
if platform not in AUTHORIZED_PLATFORMS:
raise ValueError('Platform should be one of %s' % AUTHORIZED_PLATFORMS)
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'swh.deposit.settings.%s' % platform)
import django
django.setup()
class SWHDefaultConfig(SWHConfig):
"""Mixin intended to enrich views with SWH configuration.
"""
CONFIG_BASE_FILENAME = 'deposit/server'
DEFAULT_CONFIG = {
'max_upload_size': ('int', 209715200),
}
def __init__(self, **config):
super().__init__()
self.config = self.parse_config_file()
self.config.update(config)
self.log = logging.getLogger('swh.deposit')
diff --git a/swh/deposit/signals.py b/swh/deposit/signals.py
index 1514d487..00144867 100644
--- a/swh/deposit/signals.py
+++ b/swh/deposit/signals.py
@@ -1,87 +1,83 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining some uncoupled actions on deposit.
Typically, checking that the archives deposited are ok are not
directly testing in the request/answer to avoid too long
computations.
So this is done in the deposit_on_status_ready_for_check callback.
"""
import zipfile
from django.db.models.signals import post_save
from django.dispatch import receiver
-from .models import Deposit, DepositRequest, DepositRequestType
+from .models import DepositRequest
from .config import DEPOSIT_STATUS_READY, DEPOSIT_STATUS_REJECTED
-from .config import DEPOSIT_STATUS_READY_FOR_CHECKS
+from .config import DEPOSIT_STATUS_READY_FOR_CHECKS, ARCHIVE_TYPE
-def checks(deposit):
- """Additional checks to execute on the deposit's associated data (archive).
- the status to ready for injection.
+def checks(deposit_request):
+ """Additional checks to execute on the deposit request's associated
+ data (archive).
Args:
- The deposit whose archives we need to check
+ The deposit request whose archive we need to check
Returns:
- True if every we can at least read some content to every
- deposit associated archive. False otherwise.
+ True if we can at least read some content to the
+ request's deposit associated archive. False otherwise.
"""
- archive_type = DepositRequestType.objects.filter(name='archive')
- requests = DepositRequest.objects.filter(deposit=deposit,
- type=archive_type)
+ if deposit_request.type.name != ARCHIVE_TYPE: # no check for other types
+ return True
try:
- for req in requests:
- archive = req.archive
- print('check %s' % archive.path)
- zf = zipfile.ZipFile(archive.path)
- zf.infolist()
+ archive = deposit_request.archive
+ zf = zipfile.ZipFile(archive.path)
+ zf.infolist()
except Exception as e:
- print(e)
return False
else:
return True
-@receiver(post_save, sender=Deposit)
+@receiver(post_save, sender=DepositRequest)
def deposit_on_status_ready_for_check(sender, instance, created, raw, using,
update_fields, **kwargs):
"""Check the status is ready for check.
If so, try and check the associated archives.
If not, move along.
When
Triggered when a deposit is saved.
Args:
- sender (Deposit): The model class
- instance (Deposit): The actual instance being saved
+ sender (DepositRequest): The model class
+ instance (DepositRequest): The actual instance being saved
created (bool): True if a new record was created
raw (bool): True if the model is saved exactly as presented
(i.e. when loading a fixture). One should not
query/modify other records in the database as the
database might not be in a consistent state yet
using: The database alias being used
update_fields: The set of fields to update as passed to
Model.save(), or None if update_fields wasn’t
passed to save()
"""
- if instance.status is not DEPOSIT_STATUS_READY_FOR_CHECKS:
+ if instance.deposit.status is not DEPOSIT_STATUS_READY_FOR_CHECKS:
return
+
if not checks(instance):
- instance.status = DEPOSIT_STATUS_REJECTED
+ instance.deposit.status = DEPOSIT_STATUS_REJECTED
else:
- instance.status = DEPOSIT_STATUS_READY
- print('Check ok: %s -> %s' % (instance.status, DEPOSIT_STATUS_READY))
+ instance.deposit.status = DEPOSIT_STATUS_READY
- instance.save()
+ instance.deposit.save()
diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py
index 835c4c19..46d2c896 100644
--- a/swh/deposit/tests/api/test_deposit.py
+++ b/swh/deposit/tests/api/test_deposit.py
@@ -1,49 +1,119 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import hashlib
+
from django.core.urlresolvers import reverse
+from io import BytesIO
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
-from swh.deposit.config import COL_IRI
-from swh.deposit.models import DepositClient, DepositCollection
+from swh.deposit.config import COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED
+from swh.deposit.config import DEPOSIT_STATUS_PARTIAL
+from swh.deposit.models import Deposit, DepositClient, DepositCollection
+from swh.deposit.parsers import parse_xml
+
+from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
+
+
+class DepositNoAuthCase(APITestCase, BasicTestCase):
+ """Deposit access are protected with basic authentication.
+
+ """
+ @istest
+ def post_will_fail_with_401(self):
+ """Without authentication, endpoint refuses access with 401 response
+
+ """
+ url = reverse(COL_IRI, args=[self.collection.name])
+
+ # when
+ response = self.client.post(url)
-from ..common import BasicTestCase, WithAuthTestCase
+ # then
+ self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
-class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase):
+class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
+ CommonCreationRoutine):
"""Deposit access are protected with basic authentication.
"""
def setUp(self):
super().setUp()
# Add another user
_collection2 = DepositCollection(name='some')
_collection2.save()
_user = DepositClient.objects.create_user(username='user',
password='user')
_user.collections = [_collection2.id]
self.collection2 = _collection2
@istest
def access_to_another_user_collection_is_forbidden(self):
"""Access to another user collection should return a 403
"""
url = reverse(COL_IRI, args=[self.collection2.name])
response = self.client.post(url)
self.assertEqual(response.status_code,
status.HTTP_403_FORBIDDEN)
@istest
def delete_on_col_iri_not_supported(self):
"""Delete on col iri should return a 405 response
"""
url = reverse(COL_IRI, args=[self.collection.name])
response = self.client.delete(url)
self.assertEqual(response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED)
+
+ @istest
+ def create_deposit_with_rejection_status(self):
+ url = reverse(COL_IRI, args=[self.collection.name])
+
+ data = b'some data which is clearly not a zip file'
+ md5sum = hashlib.md5(data).hexdigest()
+ external_id = 'some-external-id-1'
+
+ # when
+ response = self.client.post(
+ url,
+ content_type='application/zip', # as zip
+ data=data,
+ # + headers
+ CONTENT_LENGTH=len(data),
+ # other headers needs HTTP_ prefix to be taken into account
+ HTTP_SLUG=external_id,
+ HTTP_CONTENT_MD5=md5sum,
+ HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
+ HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
+
+ self.assertEquals(response.status_code, status.HTTP_201_CREATED)
+ response_content = parse_xml(BytesIO(response.content))
+ actual_state = response_content[
+ '{http://www.w3.org/2005/Atom}deposit_state']
+ self.assertEquals(actual_state, DEPOSIT_STATUS_REJECTED)
+
+ @istest
+ def act_on_deposit_rejected_is_not_permitted(self):
+ deposit_id = self.create_deposit_with_status_rejected()
+
+ deposit = Deposit.objects.get(pk=deposit_id)
+ assert deposit.status == DEPOSIT_STATUS_REJECTED
+
+ response = self.client.post(
+ reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
+ content_type='application/atom+xml;type=entry',
+ data=self.atom_entry_data1,
+ HTTP_SLUG='external-id')
+
+ self.assertEquals(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertRegex(
+ response.content.decode('utf-8'),
+ "You can only act on deposit with status '%s'" % (
+ DEPOSIT_STATUS_PARTIAL, ))
diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py
index c652bff8..b5a1b75b 100644
--- a/swh/deposit/tests/api/test_deposit_binary.py
+++ b/swh/deposit/tests/api/test_deposit_binary.py
@@ -1,723 +1,660 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import hashlib
-import os
-import shutil
-
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.urlresolvers import reverse
from io import BytesIO
from nose.tools import istest
-from nose.plugins.attrib import attr
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.tests import TEST_CONFIG
from swh.deposit.config import COL_IRI, EM_IRI
from swh.deposit.config import DEPOSIT_STATUS_READY
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase, create_arborescence_zip
+from ..common import FileSystemCreationRoutine
-class DepositNoAuthCase(APITestCase, BasicTestCase):
- """Deposit access are protected with basic authentication.
-
- """
- @istest
- def post_will_fail_with_401(self):
- """Without authentication, endpoint refuses access with 401 response
-
- """
- url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
-
- external_id = 'some-external-id-1'
-
- # when
- response = self.client.post(
- url,
- content_type='application/zip', # as zip
- data=data_text,
- # + headers
- CONTENT_LENGTH=len(data_text),
- HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
- HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
- HTTP_IN_PROGRESS='false',
- HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
-
- # then
- self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
-
-
-@attr('fs')
-class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase):
+class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
+ FileSystemCreationRoutine):
"""Try and upload one single deposit
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
Awesome Compiler
hal
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
%s
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data1 = b"""
hal
urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data2 = b"""
%s
"""
self.atom_entry_data_empty_body = b"""
"""
self.atom_entry_data3 = b"""
something
"""
self.data_atom_entry_ok = b"""
Title
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
2005-10-07T17:17:08Z
Contributor
The abstract
The abstract
Access Rights
Alternative Title
Date Available
Bibliographic Citation # noqa
Contributor
Description
Has Part
Has Version
Identifier
Is Part Of
Publisher
References
Rights Holder
Source
Title
Type
"""
- self.root_path = '/tmp/swh-deposit/test/build-zip2/'
- os.makedirs(self.root_path, exist_ok=True)
-
- self.archive = create_arborescence_zip(
- self.root_path, 'archive1', 'file1', b'some content in file')
-
- def tearDown(self):
- shutil.rmtree(self.root_path)
-
@istest
def post_deposit_binary_without_slug_header_is_bad_request(self):
"""Posting a binary deposit without slug header should return 400
"""
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
- HTTP_CONTENT_MD5=md5sum,
+ CONTENT_LENGTH=self.archive['length'],
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
@istest
def post_deposit_binary_upload_final_and_status_check(self):
"""Binary upload with correct headers should return 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
response_content = parse_xml(BytesIO(response.content))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_READY)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertEquals(deposit_request.deposit, deposit)
self.assertRegex(deposit_request.archive.name, self.archive['name'])
response_content = parse_xml(BytesIO(response.content))
self.assertEqual(
response_content['{http://www.w3.org/2005/Atom}deposit_archive'],
self.archive['name'])
self.assertEqual(
response_content['{http://www.w3.org/2005/Atom}deposit_id'],
deposit.id)
self.assertEqual(
response_content['{http://www.w3.org/2005/Atom}deposit_state'],
deposit.status)
edit_se_iri = reverse('edit_se_iri',
args=[self.collection.name, deposit.id])
self.assertEqual(response._headers['location'],
('Location', 'http://testserver' + edit_se_iri))
@istest
def post_deposit_binary_upload_only_supports_zip(self):
"""Binary upload without content_type application/zip should return 415
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/octet-stream',
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
@istest
def post_deposit_binary_fails_if_unsupported_packaging_header(
self):
"""Bin deposit without supported content_disposition header returns 400
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='something-unsupported',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
@istest
def post_deposit_binary_upload_fail_if_no_content_disposition_header(
self):
"""Binary upload without content_disposition header should return 400
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
@istest
def post_deposit_mediation_not_supported(self):
"""Binary upload with mediation should return a 412 response
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip',
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_ON_BEHALF_OF='someone',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_412_PRECONDITION_FAILED)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
@istest
def post_deposit_binary_upload_fail_if_upload_size_limit_exceeded(
self):
"""Binary upload must not exceed the limit set up...
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
archive = create_arborescence_zip(
self.root_path, 'archive2', 'file2', b'some content in file',
up_to_size=TEST_CONFIG['max_upload_size'])
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
data=archive['data'],
# + headers
CONTENT_LENGTH=archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
- print(response.content)
self.assertEqual(response.status_code,
status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
self.assertRegex(response.content, b'Upload size limit exceeded')
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
@istest
def post_deposit_2_post_2_different_deposits(self):
"""2 posting deposits should return 2 different 201 with receipt
"""
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG='some-external-id-1',
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
deposits = Deposit.objects.all()
self.assertEqual(len(deposits), 1)
self.assertEqual(deposits[0], deposit)
# second post
response = self.client.post(
url,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG='another-external-id',
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id2 = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit2 = Deposit.objects.get(pk=deposit_id2)
self.assertNotEqual(deposit, deposit2)
deposits = Deposit.objects.all().order_by('id')
self.assertEqual(len(deposits), 2)
self.assertEqual(list(deposits), [deposit, deposit2])
@istest
def post_deposit_binary_and_post_to_add_another_archive(self):
"""Updating a deposit should return a 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='true',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertEquals(deposit_request.deposit, deposit)
self.assertEquals(deposit_request.type.name, 'archive')
self.assertRegex(deposit_request.archive.name, self.archive['name'])
# 2nd archive to upload
archive2 = create_arborescence_zip(
self.root_path, 'archive2', 'file2', b'some other content in file')
- import os
- print('exists?', os.path.exists(archive2['path']))
-
# uri to update the content
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
# adding another archive for the deposit and finalizing it
response = self.client.post(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name']))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
- print(response_content)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_READY)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit).
order_by('id'))
# 2 deposit requests for the same deposit
self.assertEquals(len(deposit_requests), 2)
self.assertEquals(deposit_requests[0].deposit, deposit)
self.assertEquals(deposit_requests[0].type.name, 'archive')
self.assertRegex(deposit_requests[0].archive.name,
self.archive['name'])
self.assertEquals(deposit_requests[1].deposit, deposit)
self.assertEquals(deposit_requests[1].type.name, 'archive')
self.assertRegex(deposit_requests[1].archive.name,
archive2['name'])
# only 1 deposit in db
deposits = Deposit.objects.all()
self.assertEqual(len(deposits), 1)
@istest
def post_deposit_then_post_or_put_is_refused_when_status_ready(self):
"""Updating a deposit with status 'ready' should return a 400
"""
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
- # 1st archive to upload
- data_text0 = b'some other content'
- md5sum0 = hashlib.md5(data_text0).hexdigest()
-
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
- data=data_text0,
+ data=self.archive['data'],
# + headers
- CONTENT_LENGTH=len(data_text0),
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum0,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_READY)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertEquals(deposit_request.deposit, deposit)
self.assertRegex(deposit_request.archive.name, 'filename0')
# updating/adding is forbidden
# uri to update the content
edit_se_iri = reverse(
'edit_se_iri', args=[self.collection.name, deposit_id])
em_iri = reverse(
'em_iri', args=[self.collection.name, deposit_id])
# Testing all update/add endpoint should fail
# since the status is ready
+ archive2 = create_arborescence_zip(
+ self.root_path, 'archive2', 'file2', b'some content in file 2')
+
# replacing file is no longer possible since the deposit's
# status is ready
r = self.client.put(
em_iri,
content_type='application/zip',
- data=data_text0,
- CONTENT_LENGTH=len(data_text0),
+ data=archive2['data'],
+ CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum0,
+ HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding file is no longer possible since the deposit's status
# is ready
r = self.client.post(
em_iri,
content_type='application/zip',
- data=data_text0,
- CONTENT_LENGTH=len(data_text0),
+ data=archive2['data'],
+ CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum0,
+ HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
# replacing metadata is no longer possible since the deposit's
# status is ready
r = self.client.put(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_ok,
CONTENT_LENGTH=len(self.data_atom_entry_ok),
HTTP_SLUG=external_id)
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding new metadata is no longer possible since the
# deposit's status is ready
r = self.client.post(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_ok,
CONTENT_LENGTH=len(self.data_atom_entry_ok),
HTTP_SLUG=external_id)
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(self.data_atom_entry_ok),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(self.data_atom_entry_ok),
charset='utf-8')
# replacing multipart metadata is no longer possible since the
# deposit's status is ready
r = self.client.put(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding new metadata is no longer possible since the
# deposit's status is ready
r = self.client.post(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
diff --git a/swh/deposit/tests/api/test_deposit_delete.py b/swh/deposit/tests/api/test_deposit_delete.py
index 13802050..d7d60276 100644
--- a/swh/deposit/tests/api/test_deposit_delete.py
+++ b/swh/deposit/tests/api/test_deposit_delete.py
@@ -1,118 +1,119 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import EDIT_SE_IRI, EM_IRI, ARCHIVE_KEY, METADATA_KEY
from swh.deposit.config import DEPOSIT_STATUS_READY
from swh.deposit.models import Deposit, DepositRequest
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositDeleteTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
@istest
def delete_archive_on_partial_deposit_works(self):
"""Removing partial deposit's archive should return a 204 response
"""
# given
deposit_id = self.create_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for dr in deposit_requests:
if dr.type.name == ARCHIVE_KEY:
continue
elif dr.type.name == METADATA_KEY:
continue
else:
self.fail('only archive and metadata type should exist '
'in this test context')
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
deposit = Deposit.objects.get(pk=deposit_id)
requests = list(DepositRequest.objects.filter(deposit=deposit))
- self.assertEquals(len(requests), 1)
+ self.assertEquals(len(requests), 2)
self.assertEquals(requests[0].type.name, 'metadata')
+ self.assertEquals(requests[1].type.name, 'metadata')
@istest
def delete_archive_on_undefined_deposit_fails(self):
"""Delete undefined deposit returns a 404 response
"""
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, 999])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def delete_archive_on_non_partial_deposit_fails(self):
"""Delete !partial status deposit should return a 400 response"""
deposit_id = self.create_deposit_ready()
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_READY
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit)
@istest
def delete_partial_deposit_works(self):
"""Delete deposit should return a 204 response
"""
# given
deposit_id = self.create_simple_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.id == deposit_id
# when
url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(url)
# then
self.assertEqual(response.status_code,
status.HTTP_204_NO_CONTENT)
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit))
self.assertEquals(deposit_requests, [])
deposits = list(Deposit.objects.filter(pk=deposit_id))
self.assertEquals(deposits, [])
@istest
def delete_on_edit_se_iri_cannot_delete_non_partial_deposit(self):
"""Delete !partial deposit should return a 400 response
"""
# given
deposit_id = self.create_deposit_ready()
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.id == deposit_id
# when
url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(url)
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit)
diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py
index 509c34f4..c39cda7f 100644
--- a/swh/deposit/tests/api/test_deposit_multipart.py
+++ b/swh/deposit/tests/api/test_deposit_multipart.py
@@ -1,338 +1,326 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
-import shutil
-
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.urlresolvers import reverse
from io import BytesIO
from nose.tools import istest
-from nose.plugins.attrib import attr
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_READY
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
-from ..common import BasicTestCase, WithAuthTestCase, create_arborescence_zip
+from ..common import BasicTestCase, WithAuthTestCase
+from ..common import FileSystemCreationRoutine
-@attr('fs')
-class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase):
+class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
+ FileSystemCreationRoutine):
"""Post multipart deposit scenario
"""
def setUp(self):
super().setUp()
self.data_atom_entry_ok = b"""
Title
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
2005-10-07T17:17:08Z
Contributor
The abstract
The abstract
Access Rights
Alternative Title
Date Available
Bibliographic Citation # noqa
Contributor
Description
Has Part
Has Version
Identifier
Is Part Of
Publisher
References
Rights Holder
Source
Title
Type
"""
self.data_atom_entry_update_in_place = """
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b
Title
Type
"""
- self.root_path = '/tmp/swh-deposit/test/build-zip2/'
- os.makedirs(self.root_path, exist_ok=True)
-
- self.archive = create_arborescence_zip(
- self.root_path, 'archive1', 'file1', b'some content in file')
-
- def tearDown(self):
- shutil.rmtree(self.root_path)
-
@istest
def post_deposit_multipart_without_slug_header_is_bad_request(self):
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
@istest
def post_deposit_multipart(self):
"""one multipart deposit should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
data_atom_entry = self.data_atom_entry_ok
- archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
- BytesIO(archive_content),
- field_name='archive0',
- name='archive0',
+ BytesIO(self.archive['data']),
+ field_name=self.archive['name'],
+ name=self.archive['name'],
content_type='application/zip',
- size=len(archive_content),
+ size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_READY)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
- self.assertRegex(deposit_request.archive.name, 'archive0')
+ self.assertRegex(deposit_request.archive.name,
+ self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
@istest
def post_deposit_multipart_put_to_replace_metadata(self):
"""One multipart deposit followed by a metadata update should be
accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/zip',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='true',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
replace_metadata_uri = response._headers['location'][1]
response = self.client.put(
replace_metadata_uri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_update_in_place,
HTTP_IN_PROGRESS='false')
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# deposit_id did not change
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_READY)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEquals(len(deposit_requests), 2)
for deposit_request in deposit_requests:
self.assertEquals(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
self.assertEquals(
deposit_request.metadata[
'{http://www.w3.org/2005/Atom}id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b')
# FAILURE scenarios
@istest
def post_deposit_multipart_only_archive_and_atom_entry(self):
"""Multipart deposit only accepts one archive and one atom+xml"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
other_archive_content = b"some-other-content"
other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
field_name='atom0',
name='atom0',
content_type='application/zip',
size=len(other_archive_content),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': other_archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
# when
archive.seek(0)
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py
index 4dae6876..c14dda27 100644
--- a/swh/deposit/tests/api/test_deposit_read_archive.py
+++ b/swh/deposit/tests/api/test_deposit_read_archive.py
@@ -1,144 +1,128 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
import os
-import shutil
-import tempfile
from django.core.urlresolvers import reverse
from nose.tools import istest
from nose.plugins.attrib import attr
from rest_framework import status
from rest_framework.test import APITestCase
from swh.loader.tar import tarball
from swh.deposit.config import PRIVATE_GET_RAW_CONTENT
from swh.deposit.tests import TEST_CONFIG
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
-from ..common import create_arborescence_zip
+from ..common import FileSystemCreationRoutine, create_arborescence_zip
@attr('fs')
-class DepositReadArchivesTest(APITestCase, WithAuthTestCase, BasicTestCase,
- CommonCreationRoutine):
+class DepositReadArchivesTest(APITestCase, WithAuthTestCase,
+ BasicTestCase, CommonCreationRoutine,
+ FileSystemCreationRoutine):
def setUp(self):
super().setUp()
-
- root_path = '/tmp/swh-deposit/test/build-zip/'
- os.makedirs(root_path, exist_ok=True)
-
- self.archive = create_arborescence_zip(
- root_path, 'archive1', 'file1', b'some content in file')
-
self.archive2 = create_arborescence_zip(
- root_path, 'archive2', 'file2', b'some other content in file')
-
- self.workdir = tempfile.mkdtemp(dir=root_path)
- self.root_path = root_path
-
- def tearDown(self):
- shutil.rmtree(self.root_path)
+ self.root_path, 'archive2', 'file2', b'some other content in file')
+ self.workdir = os.path.join(self.root_path, 'workdir')
@istest
def access_to_existing_deposit_with_one_archive(self):
"""Access to deposit should stream a 200 response with its raw content
"""
- deposit_id = self.create_simple_binary_deposit(
- archive_path=self.archive['path'])
+ deposit_id = self.create_simple_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
self.assertEquals(r.status_code, status.HTTP_200_OK)
self.assertEquals(r._headers['content-type'][1],
'application/octet-stream')
data = r.content
actual_sha1 = hashlib.sha1(data).hexdigest()
self.assertEquals(actual_sha1, self.archive['sha1sum'])
# this does not touch the extraction dir so this should stay empty
self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
def _check_tarball_consistency(self, actual_sha1):
tarball.uncompress(self.archive['path'], self.workdir)
self.assertEquals(os.listdir(self.workdir), ['file1'])
tarball.uncompress(self.archive2['path'], self.workdir)
lst = set(os.listdir(self.workdir))
self.assertEquals(lst, {'file1', 'file2'})
new_path = self.workdir + '.zip'
tarball.compress(new_path, 'zip', self.workdir)
with open(new_path, 'rb') as f:
h = hashlib.sha1(f.read()).hexdigest()
self.assertEqual(actual_sha1, h)
self.assertNotEqual(actual_sha1, self.archive['sha1sum'])
self.assertNotEqual(actual_sha1, self.archive2['sha1sum'])
@istest
def access_to_existing_deposit_with_multiple_archives(self):
"""Access to deposit should stream a 200 response with its raw contents
"""
- deposit_id = self.create_complex_binary_deposit(
- archive_path=self.archive['path'],
- archive_path2=self.archive2['path'])
+ deposit_id = self.create_complex_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
self.assertEquals(r.status_code, status.HTTP_200_OK)
self.assertEquals(r._headers['content-type'][1],
'application/octet-stream')
data = r.content
actual_sha1 = hashlib.sha1(data).hexdigest()
self._check_tarball_consistency(actual_sha1)
# this touches the extraction directory but should clean up
# after itself
self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine):
@istest
def access_to_nonexisting_deposit_returns_404_response(self):
"""Read unknown collection should return a 404 response
"""
unknown_id = '999'
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, unknown_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Deposit with id %s does not exist' % unknown_id,
response.content.decode('utf-8'))
@istest
def access_to_nonexisting_collection_returns_404_response(self):
"""Read unknown deposit should return a 404 response
"""
collection_name = 'non-existing'
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[collection_name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Unknown collection name %s' % collection_name,
response.content.decode('utf-8'))
diff --git a/swh/deposit/tests/api/test_deposit_status.py b/swh/deposit/tests/api/test_deposit_status.py
index 536aaca5..3a6223fe 100644
--- a/swh/deposit/tests/api/test_deposit_status.py
+++ b/swh/deposit/tests/api/test_deposit_status.py
@@ -1,78 +1,76 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import hashlib
from django.core.urlresolvers import reverse
from io import BytesIO
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit
from swh.deposit.parsers import parse_xml
-from ..common import BasicTestCase, WithAuthTestCase
+from ..common import BasicTestCase, WithAuthTestCase, FileSystemCreationRoutine
from ...config import COL_IRI, STATE_IRI, DEPOSIT_STATUS_READY
-class DepositStatusTestCase(APITestCase, WithAuthTestCase, BasicTestCase):
+class DepositStatusTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
+ FileSystemCreationRoutine):
"""Status on deposit
"""
@istest
def post_deposit_with_status_check(self):
"""Binary upload should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive['data'],
# + headers
+ CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
- HTTP_CONTENT_LENGTH=len(data_text),
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
deposit = Deposit.objects.get(external_id=external_id)
status_url = reverse(STATE_IRI,
args=[self.collection.name, deposit.id])
# check status
status_response = self.client.get(status_url)
self.assertEqual(status_response.status_code, status.HTTP_200_OK)
r = parse_xml(BytesIO(status_response.content))
self.assertEqual(r['{http://www.w3.org/2005/Atom}deposit_id'],
deposit.id)
self.assertEqual(r['{http://www.w3.org/2005/Atom}status'],
DEPOSIT_STATUS_READY)
self.assertEqual(r['{http://www.w3.org/2005/Atom}detail'],
'Deposit is fully received, checked, and ready for '
'injection')
@istest
def status_on_unknown_deposit(self):
"""Asking for the status of unknown deposit returns 404 response"""
status_url = reverse(STATE_IRI, args=[self.collection.name, 999])
status_response = self.client.get(status_url)
self.assertEqual(status_response.status_code,
status.HTTP_404_NOT_FOUND)
diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py
index c9e23469..de40e70f 100644
--- a/swh/deposit/tests/api/test_deposit_update.py
+++ b/swh/deposit/tests/api/test_deposit_update.py
@@ -1,346 +1,337 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import hashlib
-
from django.core.urlresolvers import reverse
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.config import EDIT_SE_IRI, EM_IRI
+
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
+from ..common import FileSystemCreationRoutine, create_arborescence_zip
-class DepositReplaceExistingDataTest(APITestCase, WithAuthTestCase,
- BasicTestCase, CommonCreationRoutine):
+class DepositUpdateOrReplaceExistingDataTest(
+ APITestCase, WithAuthTestCase, BasicTestCase,
+ FileSystemCreationRoutine, CommonCreationRoutine):
"""Try put/post (update/replace) query on EM_IRI
"""
def setUp(self):
super().setUp()
self.atom_entry_data1 = b"""
bar
"""
+ self.atom_entry_data1 = b"""
+
+ bar
+"""
+
+ self.archive2 = create_arborescence_zip(
+ self.root_path, 'archive2', 'file2', b'some other content in file')
+
@istest
def replace_archive_to_deposit_is_possible(self):
"""Replace all archive with another one should return a 204 response
"""
# given
- deposit_id = self.create_deposit_partial()
+ deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
- assert 'filename0' in requests[0].archive.name
+ assert self.archive['name'] in requests[0].archive.name
+
+ # we have no metadata for that deposit
+ requests = list(DepositRequest.objects.filter(
+ deposit=deposit, type=self.deposit_request_types['metadata']))
+ assert len(requests) == 0
+
+ deposit_id = self._update_deposit_with_status(deposit_id,
+ status_partial=True)
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 1
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id-1'
response = self.client.put(
update_uri,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive2['data'],
# + headers
+ CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
- HTTP_CONTENT_LENGTH=len(data_text),
- HTTP_CONTENT_DISPOSITION='attachment; filename=otherfilename')
+ HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
+ self.archive2['name'], ))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
self.assertEquals(len(list(requests)), 1)
- self.assertRegex(requests[0].archive.name, 'otherfilename')
+ self.assertRegex(requests[0].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
self.assertEquals(len(requests), 1)
@istest
def replace_metadata_to_deposit_is_possible(self):
"""Replace all metadata with another one should return a 204 response
"""
# given
- deposit_id = self.create_deposit_partial()
+ deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
-
- assert len(list(requests)) == 1
- external_id_key = '{http://www.w3.org/2005/Atom}external_identifier'
- assert requests[0].metadata[external_id_key] == 'some-external-id'
+ assert len(list(requests)) == 0
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
assert len(requests) == 1
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
self.assertEquals(len(list(requests)), 1)
metadata = requests[0].metadata
- self.assertIsNone(metadata.get(external_id_key))
self.assertEquals(metadata["{http://www.w3.org/2005/Atom}foobar"],
'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
self.assertEquals(len(requests), 1)
-
-class DepositUpdateDepositWithNewDataTest(
- APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine):
- """Testing Replace/Update on EDIT_SE_IRI class.
-
- """
- def setUp(self):
- super().setUp()
-
- self.atom_entry_data1 = b"""
-
- bar
-"""
-
@istest
def add_archive_to_deposit_is_possible(self):
"""Add another archive to a deposit return a 201 response
"""
# given
- deposit_id = self.create_deposit_partial()
+ deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
- assert 'filename0' in requests[0].archive.name
+ assert self.archive['name'] in requests[0].archive.name
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
- assert len(requests) == 1
+ assert len(requests) == 0
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
external_id = 'some-external-id-1'
response = self.client.post(
update_uri,
content_type='application/zip', # as zip
- data=data_text,
+ data=self.archive2['data'],
# + headers
+ CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
- HTTP_CONTENT_MD5=md5sum,
+ HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
- HTTP_CONTENT_LENGTH=len(data_text),
- HTTP_CONTENT_DISPOSITION='attachment; filename=otherfilename')
+ HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
+ self.archive2['name'],))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = list(DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive']).order_by('id'))
self.assertEquals(len(requests), 2)
# first archive still exists
- self.assertRegex(requests[0].archive.name, 'filename0')
+ self.assertRegex(requests[0].archive.name, self.archive['name'])
# a new one was added
- self.assertRegex(requests[1].archive.name, 'otherfilename')
+ self.assertRegex(requests[1].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
- self.assertEquals(len(requests), 1)
+ self.assertEquals(len(requests), 0)
@istest
def add_metadata_to_deposit_is_possible(self):
- """Replace all metadata with another one should return a 204 response
+ """Add metadata with another one should return a 204 response
"""
# given
deposit_id = self.create_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
- assert len(list(requests)) == 1
- external_id_key = '{http://www.w3.org/2005/Atom}external_identifier'
- assert requests[0].metadata[external_id_key] == 'some-external-id'
+ assert len(list(requests)) == 2
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
- assert len(requests) == 1
+ assert len(requests) == 0
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata']).order_by('id')
- self.assertEquals(len(list(requests)), 2)
- # first metadata still exists
- self.assertEquals(requests[0].metadata[external_id_key],
- 'some-external-id')
+ self.assertEquals(len(list(requests)), 3)
# a new one was added
self.assertEquals(requests[1].metadata[
- "{http://www.w3.org/2005/Atom}foobar"],
- 'bar')
+ "{http://www.w3.org/2005/Atom}foobar"], 'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
- self.assertEquals(len(requests), 1)
+ self.assertEquals(len(requests), 0)
class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Failure scenario about add/replace (post/put) query on deposit.
"""
@istest
def add_metadata_to_unknown_collection(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI,
args=['unknown', 999]),
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def add_metadata_to_unknown_deposit(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI,
args=[self.collection.name, 999]),
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def replace_metadata_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI,
args=[self.collection.name, 999]),
response = self.client.put(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def add_archive_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
url = reverse(EM_IRI,
args=[self.collection.name, 999]),
response = self.client.post(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def replace_archive_to_unknown_deposit(self):
"""Replacing archive to unknown deposit should return a 404 response
"""
url = reverse(EM_IRI,
args=[self.collection.name, 999]),
response = self.client.put(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@istest
def post_metadata_to_em_iri_failure(self):
"""Add archive with wrong content type should return a 400 response
"""
deposit_id = self.create_deposit_ready()
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.put(
update_uri,
content_type='application/binary',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
@istest
def put_metadata_to_em_iri_failure(self):
"""Update archive with wrong content type should return 400 response
"""
# given
deposit_id = self.create_deposit_ready()
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
# then
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py
index 97c3e72a..9b3bd8fe 100644
--- a/swh/deposit/tests/common.py
+++ b/swh/deposit/tests/common.py
@@ -1,286 +1,310 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import hashlib
import os
import shutil
import tempfile
from django.core.urlresolvers import reverse
from django.test import TestCase
from io import BytesIO
+from nose.plugins.attrib import attr
from rest_framework import status
-from swh.deposit.config import COL_IRI, EM_IRI
+from swh.deposit.config import COL_IRI, EM_IRI, EDIT_SE_IRI
from swh.deposit.models import DepositClient, DepositCollection
from swh.deposit.models import DepositRequestType
from swh.deposit.parsers import parse_xml
from swh.deposit.settings.testing import MEDIA_ROOT
from swh.loader.tar import tarball
def create_arborescence_zip(root_path, archive_name, filename, content,
up_to_size=None):
"""Build an archive named archive_name in the root_path.
This archive contains one file named filename with the content content.
Returns:
dict with the keys:
- dir: the directory of that archive
- path: full path to the archive
- sha1sum: archive's sha1sum
- length: archive's length
"""
os.makedirs(root_path, exist_ok=True)
archive_path_dir = tempfile.mkdtemp(dir=root_path)
dir_path = os.path.join(archive_path_dir, archive_name)
os.mkdir(dir_path)
filepath = os.path.join(dir_path, filename)
l = len(content)
count = 0
batch_size = 128
with open(filepath, 'wb') as f:
f.write(content)
if up_to_size: # fill with blank content up to a given size
count += l
while count < up_to_size:
f.write(b'0'*batch_size)
count += batch_size
zip_path = dir_path + '.zip'
zip_path = tarball.compress(zip_path, 'zip', dir_path)
with open(zip_path, 'rb') as f:
length = 0
sha1sum = hashlib.sha1()
md5sum = hashlib.md5()
data = b''
for chunk in f:
sha1sum.update(chunk)
md5sum.update(chunk)
length += len(chunk)
data += chunk
return {
'dir': archive_path_dir,
'name': archive_name,
'data': data,
'path': zip_path,
'sha1sum': sha1sum.hexdigest(),
'md5sum': md5sum.hexdigest(),
'length': length,
}
+@attr('fs')
+class FileSystemCreationRoutine(TestCase):
+ """Mixin intended for tests needed to tamper with archives.
+
+ """
+ def setUp(self):
+ """Define the test client and other test variables."""
+ super().setUp()
+ self.root_path = '/tmp/swh-deposit/test/build-zip/'
+ os.makedirs(self.root_path, exist_ok=True)
+
+ self.archive = create_arborescence_zip(
+ self.root_path, 'archive1', 'file1', b'some content in file')
+
+ def tearDown(self):
+ super().tearDown()
+ shutil.rmtree(self.root_path)
+
+ def create_simple_binary_deposit(self, status_partial=False):
+ response = self.client.post(
+ reverse(COL_IRI, args=[self.collection.name]),
+ content_type='application/zip',
+ data=self.archive['data'],
+ CONTENT_LENGTH=self.archive['length'],
+ HTTP_MD5SUM=self.archive['md5sum'],
+ HTTP_SLUG='external-id',
+ HTTP_IN_PROGRESS=status_partial,
+ HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
+ self.archive['name'], ))
+
+ # then
+ assert response.status_code == status.HTTP_201_CREATED
+ response_content = parse_xml(BytesIO(response.content))
+ deposit_id = response_content[
+ '{http://www.w3.org/2005/Atom}deposit_id']
+ return deposit_id
+
+ def create_complex_binary_deposit(self, status_partial=False):
+ deposit_id = self.create_simple_binary_deposit(
+ status_partial=True)
+
+ # Add a second archive to the deposit
+ # update its status to DEPOSIT_STATUS_READY
+ response = self.client.post(
+ reverse(EM_IRI, args=[self.collection.name, deposit_id]),
+ content_type='application/zip',
+ data=self.archive2['data'],
+ CONTENT_LENGTH=self.archive2['length'],
+ HTTP_MD5SUM=self.archive2['md5sum'],
+ HTTP_SLUG='external-id',
+ HTTP_IN_PROGRESS=status_partial,
+ HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip')
+
+ # then
+ assert response.status_code == status.HTTP_201_CREATED
+ response_content = parse_xml(BytesIO(response.content))
+ deposit_id = response_content[
+ '{http://www.w3.org/2005/Atom}deposit_id']
+ return deposit_id
+
+
+@attr('fs')
class BasicTestCase(TestCase):
"""Mixin intended for data setup purposes (user, collection, etc...)
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
# expanding diffs in tests
self.maxDiff = None
# basic minimum test data
deposit_request_types = {}
# Add deposit request types
for deposit_request_type in ['archive', 'metadata']:
drt = DepositRequestType(name=deposit_request_type)
drt.save()
deposit_request_types[deposit_request_type] = drt
_name = 'hal'
# set collection up
_collection = DepositCollection(name=_name)
_collection.save()
# set user/client up
_client = DepositClient.objects.create_user(username=_name,
password=_name)
_client.collections = [_collection.id]
_client.save()
self.collection = _collection
self.user = _client
self.username = _name
self.userpass = _name
self.deposit_request_types = deposit_request_types
def tearDown(self):
+ super().tearDown()
# Clean up uploaded files in temporary directory (tests have
# their own media root folder)
if os.path.exists(MEDIA_ROOT):
for d in os.listdir(MEDIA_ROOT):
shutil.rmtree(os.path.join(MEDIA_ROOT, d))
class WithAuthTestCase(TestCase):
"""Mixin intended for testing the api with basic authentication.
"""
def setUp(self):
super().setUp()
_token = '%s:%s' % (self.username, self.userpass)
token = base64.b64encode(_token.encode('utf-8'))
authorization = 'Basic %s' % token.decode('utf-8')
self.client.credentials(HTTP_AUTHORIZATION=authorization)
def tearDown(self):
super().tearDown()
self.client.credentials()
class CommonCreationRoutine(TestCase):
"""Mixin class to share initialization routine.
cf:
`class`:test_deposit_update.DepositReplaceExistingDataTest
`class`:test_deposit_update.DepositUpdateDepositWithNewDataTest
`class`:test_deposit_update.DepositUpdateFailuresTest
`class`:test_deposit_delete.DepositDeleteTest
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
some-external-id
"""
- def create_simple_deposit_partial(self):
- """Create a simple deposit (1 request) in `partial` state and returns
- its new identifier.
-
- Returns:
- deposit id
-
- """
- response = self.client.post(
- reverse(COL_IRI, args=[self.collection.name]),
- content_type='application/atom+xml;type=entry',
- data=self.atom_entry_data0,
- HTTP_SLUG='external-id',
- HTTP_IN_PROGRESS='true')
-
- assert response.status_code == status.HTTP_201_CREATED
- response_content = parse_xml(BytesIO(response.content))
- deposit_id = response_content[
- '{http://www.w3.org/2005/Atom}deposit_id']
- return deposit_id
+ self.atom_entry_data1 = b"""
+
+ anotherthing
+"""
- def _init_data_from(self, archive_path, default_data):
- if not archive_path:
- data = default_data
- else:
- with open(archive_path, 'rb') as f:
- data = f.read()
+ def create_deposit_with_status_rejected(self):
+ url = reverse(COL_IRI, args=[self.collection.name])
+ data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
- return data, md5sum
-
- def create_simple_binary_deposit(self, status_partial=False,
- archive_path=None):
-
- data, md5sum = self._init_data_from(
- archive_path, b'some simulation data to pass as binary package')
+ external_id = 'some-external-id-1'
+ # when
response = self.client.post(
- reverse(COL_IRI, args=[self.collection.name]),
- content_type='application/zip',
+ url,
+ content_type='application/zip', # as zip
data=data,
- HTTP_MD5SUM=md5sum,
- HTTP_SLUG='external-id',
- HTTP_IN_PROGRESS=status_partial,
- HTTP_CONTENT_DISPOSITION='attachment; filename=filename0.zip')
+ # + headers
+ CONTENT_LENGTH=len(data),
+ # other headers needs HTTP_ prefix to be taken into account
+ HTTP_SLUG=external_id,
+ HTTP_CONTENT_MD5=md5sum,
+ HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
+ HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
- # then
- assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
- return deposit_id
- def create_complex_binary_deposit(self, status_partial=False,
- archive_path=None,
- archive_path2=None):
+ return deposit_id
- deposit_id = self.create_simple_binary_deposit(
- status_partial=True,
- archive_path=archive_path)
+ def create_simple_deposit_partial(self):
+ """Create a simple deposit (1 request) in `partial` state and returns
+ its new identifier.
- # Update the deposit to add another archive
- # and update its status to DEPOSIT_STATUS_READY
- data, md5sum = self._init_data_from(
- archive_path2, b'some other data to pass as binary package')
+ Returns:
+ deposit id
- # Add a second archive to the deposit
- # update its status to DEPOSIT_STATUS_READY
+ """
response = self.client.post(
- reverse(EM_IRI, args=[self.collection.name, deposit_id]),
- content_type='application/zip',
- data=data,
- HTTP_MD5SUM=md5sum,
+ reverse(COL_IRI, args=[self.collection.name]),
+ content_type='application/atom+xml;type=entry',
+ data=self.atom_entry_data0,
HTTP_SLUG='external-id',
- HTTP_IN_PROGRESS=status_partial,
- HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip')
+ HTTP_IN_PROGRESS='true')
- # then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def _update_deposit_with_status(self, deposit_id, status_partial=False):
"""Add to a given deposit another archive and update its current
status to `ready` (by default).
Returns:
deposit id
"""
- # add an archive
- data_text = b'some content'
- md5sum = hashlib.md5(data_text).hexdigest()
-
# when
response = self.client.post(
- reverse(EM_IRI, args=[self.collection.name, deposit_id]),
- content_type='application/zip', # as zip
- data=data_text,
- # + headers
- HTTP_CONTENT_MD5=md5sum,
- HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
- HTTP_IN_PROGRESS=status_partial,
- HTTP_CONTENT_LENGTH=len(data_text),
- HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
+ reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
+ content_type='application/atom+xml;type=entry',
+ data=self.atom_entry_data1,
+ HTTP_SLUG='external-id',
+ HTTP_IN_PROGRESS=status_partial)
# then
assert response.status_code == status.HTTP_201_CREATED
return deposit_id
def create_deposit_ready(self):
"""Create a complex deposit (2 requests) in status `ready`.
"""
deposit_id = self.create_simple_deposit_partial()
deposit_id = self._update_deposit_with_status(deposit_id)
return deposit_id
def create_deposit_partial(self):
"""Create a complex deposit (2 requests) in status `partial`.
"""
deposit_id = self.create_simple_deposit_partial()
deposit_id = self._update_deposit_with_status(
deposit_id, status_partial=True)
return deposit_id