diff --git a/swh/deposit/tests/api/data/atom/entry-data-ko.xml b/swh/deposit/tests/api/data/atom/entry-data-ko.xml
new file mode 100644
index 00000000..3f5d8802
--- /dev/null
+++ b/swh/deposit/tests/api/data/atom/entry-data-ko.xml
@@ -0,0 +1,6 @@
+
+
+
+ urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
+
diff --git a/swh/deposit/tests/api/data/atom/entry-update-in-place.xml b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml
new file mode 100644
index 00000000..1a7d7bbb
--- /dev/null
+++ b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml
@@ -0,0 +1,7 @@
+
+
+ urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b
+ Title
+ Type
+
diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py
index d4f55c07..7d3eac5d 100644
--- a/swh/deposit/tests/api/test_deposit_binary.py
+++ b/swh/deposit/tests/api/test_deposit_binary.py
@@ -1,580 +1,544 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
-import re
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.urls import reverse
from io import BytesIO
from rest_framework import status
from swh.deposit.tests import TEST_CONFIG
from swh.deposit.config import (
COL_IRI, EM_IRI, DEPOSIT_STATUS_DEPOSITED,
)
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
-from swh.deposit.tests.common import create_arborescence_archive
-
-
-def check_archive(archive_name: str, archive_name_to_check: str):
- """Helper function to ensure archive_name is present within the
- archive_name_to_check.
-
- Raises:
- AssertionError if archive_name is not present within
- archive_name_to_check
-
- """
- if '.' in archive_name:
- filename, extension = archive_name.split('.')
- pattern = re.compile('.*/%s.*\\.%s' % (filename, extension))
- else:
- pattern = re.compile('.*/%s' % archive_name)
- assert pattern.match(archive_name_to_check) is not None
-
-
-def test_check_archive_helper():
- # success
- for archive_name, archive_name_to_check in [
- ('filename0', 'something/filename0'),
- ('archive.zip', 'client_1/archive_noisynoise.zip'),
- ]:
- check_archive(archive_name, archive_name_to_check)
-
- # failures
- for archive_name, archive_name_to_check in [
- ('filename0', 'something-filename0'),
- ('archive.zip', 'client_1_archive_noisynoise.zip'),
- ('reference', 'irrelevant'),
- ]:
- with pytest.raises(AssertionError):
- check_archive(archive_name, archive_name_to_check)
+from swh.deposit.tests.common import create_arborescence_archive, check_archive
def test_post_deposit_binary_no_slug(
authenticated_client, deposit_collection, sample_archive):
"""Posting a binary deposit without slug header should return 400
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert b'Missing SLUG header' in response.content
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_binary_support(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with content-type not in [zip,x-tar] should return 415
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/octet-stream',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_ok(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with correct headers should return 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
sample_archive['name'], ))
# then
response_content = parse_xml(BytesIO(response.content))
assert response.status_code == status.HTTP_201_CREATED
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
check_archive(sample_archive['name'], deposit_request.archive.name)
assert deposit_request.metadata is None
assert deposit_request.raw_metadata is None
response_content = parse_xml(BytesIO(response.content))
assert response_content['deposit_archive'] == sample_archive['name']
assert int(response_content['deposit_id']) == deposit.id
assert response_content['deposit_status'] == deposit.status
edit_se_iri = reverse('edit_se_iri',
args=[deposit_collection.name, deposit.id])
assert response._headers['location'] == (
'Location', 'http://testserver' + edit_se_iri)
def test_post_deposit_binary_failure_unsupported_packaging_header(
authenticated_client, deposit_collection, sample_archive):
"""Bin deposit without supported content_disposition header returns 400
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='something-unsupported',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_400_BAD_REQUEST
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_no_content_disposition_header(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload without content_disposition header should return 400
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_400_BAD_REQUEST
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_mediation_not_supported(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with mediation should return a 412 response
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_ON_BEHALF_OF='someone',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded(
authenticated_client, deposit_collection, sample_archive, tmp_path):
"""Binary upload must not exceed the limit set up...
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
archive = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some content in file',
up_to_size=TEST_CONFIG['max_upload_size'])
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=archive['data'],
# + headers
CONTENT_LENGTH=archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE
assert b'Upload size limit exceeded' in response.content
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_2_post_2_different_deposits(
authenticated_client, deposit_collection, sample_archive):
"""2 posting deposits should return 2 different 201 with receipt
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG='some-external-id-1',
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
deposits = Deposit.objects.all()
assert len(deposits) == 1
assert deposits[0] == deposit
# second post
response = authenticated_client.post(
url,
content_type='application/x-tar', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG='another-external-id',
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id2 = response_content['deposit_id']
deposit2 = Deposit.objects.get(pk=deposit_id2)
assert deposit != deposit2
deposits = Deposit.objects.all().order_by('id')
assert len(deposits) == 2
assert list(deposits), [deposit == deposit2]
def test_post_deposit_binary_and_post_to_add_another_archive(
authenticated_client, deposit_collection, sample_archive, tmp_path):
"""Updating a deposit should return a 201 with receipt
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='true',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
sample_archive['name'], ))
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == 'partial'
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.deposit == deposit
assert deposit_request.type == 'archive'
check_archive(sample_archive['name'], deposit_request.archive.name)
# 2nd archive to upload
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some other content in file')
# uri to update the content
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id])
# adding another archive for the deposit and finalizing it
response = authenticated_client.post(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name']))
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit).
order_by('id'))
# 2 deposit requests for the same deposit
assert len(deposit_requests) == 2
assert deposit_requests[0].deposit == deposit
assert deposit_requests[0].type == 'archive'
check_archive(sample_archive['name'], deposit_requests[0].archive.name)
assert deposit_requests[1].deposit == deposit
assert deposit_requests[1].type == 'archive'
check_archive(archive2['name'], deposit_requests[1].archive.name)
# only 1 deposit in db
deposits = Deposit.objects.all()
assert len(deposits) == 1
def test_post_deposit_then_update_refused(
authenticated_client, deposit_collection,
sample_archive, atom_dataset, tmp_path):
"""Updating a deposit with status 'ready' should return a 400
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.deposit == deposit
check_archive('filename0', deposit_request.archive.name)
# updating/adding is forbidden
# uri to update the content
edit_se_iri = reverse(
'edit_se_iri', args=[deposit_collection.name, deposit_id])
em_iri = reverse(
'em_iri', args=[deposit_collection.name, deposit_id])
# Testing all update/add endpoint should fail
# since the status is ready
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some content in file 2')
# replacing file is no longer possible since the deposit's
# status is ready
r = authenticated_client.put(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding file is no longer possible since the deposit's status
# is ready
r = authenticated_client.post(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert r.status_code == status.HTTP_400_BAD_REQUEST
# replacing metadata is no longer possible since the deposit's
# status is ready
r = authenticated_client.put(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-deposit-binary'],
CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']),
HTTP_SLUG=external_id)
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding new metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.post(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-deposit-binary'],
CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']),
HTTP_SLUG=external_id)
assert r.status_code == status.HTTP_400_BAD_REQUEST
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(atom_dataset['entry-data-deposit-binary']),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(atom_dataset['entry-data-deposit-binary']),
charset='utf-8')
# replacing multipart metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.put(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding new metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.post(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
assert r.status_code == status.HTTP_400_BAD_REQUEST
diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py
index 05a03832..d9420f8d 100644
--- a/swh/deposit/tests/api/test_deposit_multipart.py
+++ b/swh/deposit/tests/api/test_deposit_multipart.py
@@ -1,448 +1,389 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.urls import reverse
from io import BytesIO
from rest_framework import status
-from rest_framework.test import APITestCase
-from swh.deposit.config import COL_IRI
-from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED
+from swh.deposit.config import (
+ COL_IRI, DEPOSIT_STATUS_DEPOSITED
+)
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
-from ..common import BasicTestCase, WithAuthTestCase
-from ..common import FileSystemCreationRoutine
+from swh.deposit.tests.common import check_archive
+
+
+def test_post_deposit_multipart_without_slug_header_is_bad_request(
+ authenticated_client, deposit_collection, atom_dataset):
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ archive_content = b'some content representing archive'
+ archive = InMemoryUploadedFile(
+ BytesIO(archive_content),
+ field_name='archive0',
+ name='archive0',
+ content_type='application/zip',
+ size=len(archive_content),
+ charset=None)
+
+ data_atom_entry = atom_dataset['entry-data-deposit-binary']
+ atom_entry = InMemoryUploadedFile(
+ BytesIO(data_atom_entry),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/atom+xml; charset="utf-8"',
+ size=len(data_atom_entry),
+ charset='utf-8')
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': atom_entry,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false')
+
+ assert b'Missing SLUG header' in response.content
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+
+def test_post_deposit_multipart_zip(
+ authenticated_client, deposit_collection,
+ atom_dataset, sample_archive):
+ """one multipart deposit (zip+xml) should be accepted
+ """
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ archive = InMemoryUploadedFile(
+ BytesIO(sample_archive['data']),
+ field_name=sample_archive['name'],
+ name=sample_archive['name'],
+ content_type='application/zip',
+ size=sample_archive['length'],
+ charset=None)
+
+ data_atom_entry = atom_dataset['entry-data-deposit-binary']
+ atom_entry = InMemoryUploadedFile(
+ BytesIO(data_atom_entry),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/atom+xml; charset="utf-8"',
+ size=len(data_atom_entry),
+ charset='utf-8')
+
+ external_id = 'external-id'
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': atom_entry,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false',
+ HTTP_SLUG=external_id)
+
+ # then
+ assert response.status_code == status.HTTP_201_CREATED
+
+ response_content = parse_xml(BytesIO(response.content))
+ deposit_id = response_content['deposit_id']
+
+ deposit = Deposit.objects.get(pk=deposit_id)
+ assert deposit.status == DEPOSIT_STATUS_DEPOSITED
+ assert deposit.external_id == external_id
+ assert deposit.collection == deposit_collection
+ assert deposit.swh_id is None
+
+ deposit_requests = DepositRequest.objects.filter(deposit=deposit)
+ assert len(deposit_requests) == 2
+ for deposit_request in deposit_requests:
+ assert deposit_request.deposit == deposit
+ if deposit_request.type == 'archive':
+ check_archive(sample_archive['name'], deposit_request.archive.name)
+ assert deposit_request.metadata is None
+ assert deposit_request.raw_metadata is None
+ else:
+ assert deposit_request.metadata['id'] == \
+ 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
+ assert deposit_request.raw_metadata == \
+ data_atom_entry.decode('utf-8')
+
+
+def test_post_deposit_multipart_tar(
+ authenticated_client, deposit_collection,
+ atom_dataset, sample_archive):
+ """one multipart deposit (tar+xml) should be accepted
-class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
- FileSystemCreationRoutine):
- """Post multipart deposit scenario
+ """
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ # from django.core.files import uploadedfile
+ data_atom_entry = atom_dataset['entry-data-deposit-binary']
+
+ archive = InMemoryUploadedFile(
+ BytesIO(sample_archive['data']),
+ field_name=sample_archive['name'],
+ name=sample_archive['name'],
+ content_type='application/x-tar',
+ size=sample_archive['length'],
+ charset=None)
+
+ atom_entry = InMemoryUploadedFile(
+ BytesIO(data_atom_entry),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/atom+xml; charset="utf-8"',
+ size=len(data_atom_entry),
+ charset='utf-8')
+
+ external_id = 'external-id'
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': atom_entry,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false',
+ HTTP_SLUG=external_id)
+
+ # then
+ assert response.status_code == status.HTTP_201_CREATED
+
+ response_content = parse_xml(BytesIO(response.content))
+ deposit_id = response_content['deposit_id']
+
+ deposit = Deposit.objects.get(pk=deposit_id)
+ assert deposit.status == DEPOSIT_STATUS_DEPOSITED
+ assert deposit.external_id == external_id
+ assert deposit.collection == deposit_collection
+ assert deposit.swh_id is None
+
+ deposit_requests = DepositRequest.objects.filter(deposit=deposit)
+ assert len(deposit_requests) == 2
+ for deposit_request in deposit_requests:
+ assert deposit_request.deposit == deposit
+ if deposit_request.type == 'archive':
+ check_archive(sample_archive['name'], deposit_request.archive.name)
+ assert deposit_request.metadata is None
+ assert deposit_request.raw_metadata is None
+ else:
+ assert deposit_request.metadata['id'] == \
+ 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
+ assert deposit_request.raw_metadata == \
+ data_atom_entry.decode('utf-8')
+
+
+def test_post_deposit_multipart_put_to_replace_metadata(
+ authenticated_client, deposit_collection,
+ atom_dataset, sample_archive):
+ """One multipart deposit followed by a metadata update should be
+ accepted
"""
- def setUp(self):
- super().setUp()
-
- self.data_atom_entry_ok = b"""
-
- Title
- urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
- 2005-10-07T17:17:08Z
- Contributor
- The abstract
-
-
- The abstract
- Access Rights
- Alternative Title
- Date Available
- Bibliographic Citation # noqa
- Contributor
- Description
- Has Part
- Has Version
- Identifier
- Is Part Of
- Publisher
- References
- Rights Holder
- Source
- Title
- Type
-
-"""
-
- self.data_atom_entry_update_in_place = """
-
- urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b
- Title
- Type
-"""
-
- def test_post_deposit_multipart_without_slug_header_is_bad_request(self):
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
- data_atom_entry = self.data_atom_entry_ok
-
- archive_content = b'some content representing archive'
- archive = InMemoryUploadedFile(
- BytesIO(archive_content),
- field_name='archive0',
- name='archive0',
- content_type='application/zip',
- size=len(archive_content),
- charset=None)
-
- atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
- field_name='atom0',
- name='atom0',
- content_type='application/atom+xml; charset="utf-8"',
- size=len(data_atom_entry),
- charset='utf-8')
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': atom_entry,
- },
- # + headers
- HTTP_IN_PROGRESS='false')
-
- self.assertIn(b'Missing SLUG header', response.content)
- self.assertEqual(response.status_code,
- status.HTTP_400_BAD_REQUEST)
-
- def test_post_deposit_multipart_zip(self):
- """one multipart deposit (zip+xml) should be accepted
-
- """
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
-
- # from django.core.files import uploadedfile
- data_atom_entry = self.data_atom_entry_ok
-
- archive = InMemoryUploadedFile(
- BytesIO(self.archive['data']),
- field_name=self.archive['name'],
- name=self.archive['name'],
- content_type='application/zip',
- size=self.archive['length'],
- charset=None)
-
- atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
- field_name='atom0',
- name='atom0',
- content_type='application/atom+xml; charset="utf-8"',
- size=len(data_atom_entry),
- charset='utf-8')
-
- external_id = 'external-id'
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': atom_entry,
- },
- # + headers
- HTTP_IN_PROGRESS='false',
- HTTP_SLUG=external_id)
-
- # then
- self.assertEqual(response.status_code, status.HTTP_201_CREATED)
-
- response_content = parse_xml(BytesIO(response.content))
- deposit_id = response_content['deposit_id']
-
- deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
- self.assertEqual(deposit.external_id, external_id)
- self.assertEqual(deposit.collection, self.collection)
- self.assertEqual(deposit.client, self.user)
- self.assertIsNone(deposit.swh_id)
-
- deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEqual(len(deposit_requests), 2)
- for deposit_request in deposit_requests:
- self.assertEqual(deposit_request.deposit, deposit)
- if deposit_request.type == 'archive':
- self.assertRegex(deposit_request.archive.name,
- self.archive['name'])
- self.assertIsNone(deposit_request.metadata)
- self.assertIsNone(deposit_request.raw_metadata)
- else:
- self.assertEqual(
- deposit_request.metadata['id'],
- 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEqual(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
-
- def test_post_deposit_multipart_tar(self):
- """one multipart deposit (tar+xml) should be accepted
-
- """
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
-
- # from django.core.files import uploadedfile
- data_atom_entry = self.data_atom_entry_ok
-
- archive = InMemoryUploadedFile(
- BytesIO(self.archive['data']),
- field_name=self.archive['name'],
- name=self.archive['name'],
- content_type='application/x-tar',
- size=self.archive['length'],
- charset=None)
-
- atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
- field_name='atom0',
- name='atom0',
- content_type='application/atom+xml; charset="utf-8"',
- size=len(data_atom_entry),
- charset='utf-8')
-
- external_id = 'external-id'
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': atom_entry,
- },
- # + headers
- HTTP_IN_PROGRESS='false',
- HTTP_SLUG=external_id)
-
- # then
- self.assertEqual(response.status_code, status.HTTP_201_CREATED)
-
- response_content = parse_xml(BytesIO(response.content))
- deposit_id = response_content['deposit_id']
-
- deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
- self.assertEqual(deposit.external_id, external_id)
- self.assertEqual(deposit.collection, self.collection)
- self.assertEqual(deposit.client, self.user)
- self.assertIsNone(deposit.swh_id)
-
- deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEqual(len(deposit_requests), 2)
- for deposit_request in deposit_requests:
- self.assertEqual(deposit_request.deposit, deposit)
- if deposit_request.type == 'archive':
- self.assertRegex(deposit_request.archive.name,
- self.archive['name'])
- self.assertIsNone(deposit_request.metadata)
- self.assertIsNone(deposit_request.raw_metadata)
- else:
- self.assertEqual(
- deposit_request.metadata['id'],
- 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEqual(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
-
- def test_post_deposit_multipart_put_to_replace_metadata(self):
- """One multipart deposit followed by a metadata update should be
- accepted
-
- """
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
-
- data_atom_entry = self.data_atom_entry_ok
-
- archive = InMemoryUploadedFile(
- BytesIO(self.archive['data']),
- field_name=self.archive['name'],
- name=self.archive['name'],
- content_type='application/zip',
- size=self.archive['length'],
- charset=None)
-
- atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
- field_name='atom0',
- name='atom0',
- content_type='application/atom+xml; charset="utf-8"',
- size=len(data_atom_entry),
- charset='utf-8')
-
- external_id = 'external-id'
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': atom_entry,
- },
- # + headers
- HTTP_IN_PROGRESS='true',
- HTTP_SLUG=external_id)
-
- # then
- self.assertEqual(response.status_code, status.HTTP_201_CREATED)
-
- response_content = parse_xml(BytesIO(response.content))
- deposit_id = response_content['deposit_id']
-
- deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEqual(deposit.status, 'partial')
- self.assertEqual(deposit.external_id, external_id)
- self.assertEqual(deposit.collection, self.collection)
- self.assertEqual(deposit.client, self.user)
- self.assertIsNone(deposit.swh_id)
-
- deposit_requests = DepositRequest.objects.filter(deposit=deposit)
-
- self.assertEqual(len(deposit_requests), 2)
- for deposit_request in deposit_requests:
- self.assertEqual(deposit_request.deposit, deposit)
- if deposit_request.type == 'archive':
- self.assertRegex(deposit_request.archive.name,
- self.archive['name'])
- else:
- self.assertEqual(
- deposit_request.metadata['id'],
- 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEqual(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
-
- replace_metadata_uri = response._headers['location'][1]
- response = self.client.put(
- replace_metadata_uri,
- content_type='application/atom+xml;type=entry',
- data=self.data_atom_entry_update_in_place,
- HTTP_IN_PROGRESS='false')
-
- self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
-
- # deposit_id did not change
- deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
- self.assertEqual(deposit.external_id, external_id)
- self.assertEqual(deposit.collection, self.collection)
- self.assertEqual(deposit.client, self.user)
- self.assertIsNone(deposit.swh_id)
-
- deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEqual(len(deposit_requests), 2)
- for deposit_request in deposit_requests:
- self.assertEqual(deposit_request.deposit, deposit)
- if deposit_request.type == 'archive':
- self.assertRegex(deposit_request.archive.name,
- self.archive['name'])
- else:
- self.assertEqual(
- deposit_request.metadata['id'],
- 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b')
- self.assertEqual(
- deposit_request.raw_metadata,
- self.data_atom_entry_update_in_place)
-
- # FAILURE scenarios
-
- def test_post_deposit_multipart_only_archive_and_atom_entry(self):
- """Multipart deposit only accepts one archive and one atom+xml"""
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
-
- archive_content = b'some content representing archive'
- archive = InMemoryUploadedFile(BytesIO(archive_content),
- field_name='archive0',
- name='archive0',
- content_type='application/x-tar',
- size=len(archive_content),
- charset=None)
-
- other_archive_content = b"some-other-content"
- other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
- field_name='atom0',
- name='atom0',
- content_type='application/x-tar',
- size=len(other_archive_content),
- charset='utf-8')
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': other_archive,
- },
- # + headers
- HTTP_IN_PROGRESS='false',
- HTTP_SLUG='external-id')
-
- # then
- self.assertEqual(response.status_code,
- status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
- self.assertTrue(
- 'Only 1 application/zip (or application/x-tar) archive' in
- response.content.decode('utf-8'))
-
- # when
- archive.seek(0)
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- },
- # + headers
- HTTP_IN_PROGRESS='false',
- HTTP_SLUG='external-id')
-
- # then
- self.assertEqual(response.status_code,
- status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
- self.assertTrue(
- 'You must provide both 1 application/zip (or '
- 'application/x-tar) and 1 atom+xml entry for '
- 'multipart deposit' in response.content.decode('utf-8')
- )
-
- def test_post_deposit_multipart_400_when_badly_formatted_xml(self):
- # given
- url = reverse(COL_IRI, args=[self.collection.name])
-
- data_atom_entry_ko = b"""
-
-
- urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
-
-"""
-
- archive_content = b'some content representing archive'
- archive = InMemoryUploadedFile(
- BytesIO(archive_content),
- field_name='archive0',
- name='archive0',
- content_type='application/zip',
- size=len(archive_content),
- charset=None)
-
- atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry_ko),
- field_name='atom0',
- name='atom0',
- content_type='application/atom+xml; charset="utf-8"',
- size=len(data_atom_entry_ko),
- charset='utf-8')
-
- # when
- response = self.client.post(
- url,
- format='multipart',
- data={
- 'archive': archive,
- 'atom_entry': atom_entry,
- },
- # + headers
- HTTP_IN_PROGRESS='false',
- HTTP_SLUG='external-id',
- )
-
- self.assertIn(b'Malformed xml metadata', response.content)
- self.assertEqual(response.status_code,
- status.HTTP_400_BAD_REQUEST)
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ data_atom_entry = atom_dataset['entry-data-deposit-binary']
+
+ archive = InMemoryUploadedFile(
+ BytesIO(sample_archive['data']),
+ field_name=sample_archive['name'],
+ name=sample_archive['name'],
+ content_type='application/zip',
+ size=sample_archive['length'],
+ charset=None)
+
+ atom_entry = InMemoryUploadedFile(
+ BytesIO(data_atom_entry),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/atom+xml; charset="utf-8"',
+ size=len(data_atom_entry),
+ charset='utf-8')
+
+ external_id = 'external-id'
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': atom_entry,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='true',
+ HTTP_SLUG=external_id)
+
+ # then
+ assert response.status_code == status.HTTP_201_CREATED
+
+ response_content = parse_xml(BytesIO(response.content))
+ deposit_id = response_content['deposit_id']
+
+ deposit = Deposit.objects.get(pk=deposit_id)
+ assert deposit.status == 'partial'
+ assert deposit.external_id == external_id
+ assert deposit.collection == deposit_collection
+ assert deposit.swh_id is None
+
+ deposit_requests = DepositRequest.objects.filter(deposit=deposit)
+
+ assert len(deposit_requests) == 2
+ for deposit_request in deposit_requests:
+ assert deposit_request.deposit == deposit
+ if deposit_request.type == 'archive':
+ check_archive(sample_archive['name'], deposit_request.archive.name)
+ else:
+ assert deposit_request.metadata['id'] == \
+ 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
+ assert deposit_request.raw_metadata == \
+ data_atom_entry.decode('utf-8')
+
+ replace_metadata_uri = response._headers['location'][1]
+ response = authenticated_client.put(
+ replace_metadata_uri,
+ content_type='application/atom+xml;type=entry',
+ data=atom_dataset['entry-data-deposit-binary'],
+ HTTP_IN_PROGRESS='false')
+
+ assert response.status_code == status.HTTP_204_NO_CONTENT
+
+ # deposit_id did not change
+ deposit = Deposit.objects.get(pk=deposit_id)
+ assert deposit.status == DEPOSIT_STATUS_DEPOSITED
+ assert deposit.external_id == external_id
+ assert deposit.collection == deposit_collection
+ assert deposit.swh_id is None
+
+ deposit_requests = DepositRequest.objects.filter(deposit=deposit)
+ assert len(deposit_requests) == 2
+ for deposit_request in deposit_requests:
+ assert deposit_request.deposit == deposit
+ if deposit_request.type == 'archive':
+ check_archive(sample_archive['name'], deposit_request.archive.name)
+ else:
+ assert deposit_request.metadata['id'] == \
+ 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
+ assert deposit_request.raw_metadata == \
+ atom_dataset['entry-data-deposit-binary'].decode('utf-8')
+
+# FAILURE scenarios
+
+
+def test_post_deposit_multipart_only_archive_and_atom_entry(
+ authenticated_client, deposit_collection):
+ """Multipart deposit only accepts one archive and one atom+xml"""
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ archive_content = b'some content representing archive'
+ archive = InMemoryUploadedFile(BytesIO(archive_content),
+ field_name='archive0',
+ name='archive0',
+ content_type='application/x-tar',
+ size=len(archive_content),
+ charset=None)
+
+ other_archive_content = b"some-other-content"
+ other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/x-tar',
+ size=len(other_archive_content),
+ charset='utf-8')
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': other_archive,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false',
+ HTTP_SLUG='external-id')
+
+ # then
+ assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
+ assert 'Only 1 application/zip (or application/x-tar) archive' in \
+ response.content.decode('utf-8')
+
+ # when
+ archive.seek(0)
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false',
+ HTTP_SLUG='external-id')
+
+ # then
+ assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
+ assert (
+ 'You must provide both 1 application/zip (or '
+ 'application/x-tar) and 1 atom+xml entry for '
+ 'multipart deposit' in response.content.decode('utf-8')
+ ) is True
+
+
+def test_post_deposit_multipart_400_when_badly_formatted_xml(
+ authenticated_client, deposit_collection,
+ sample_archive, atom_dataset):
+ # given
+ url = reverse(COL_IRI, args=[deposit_collection.name])
+
+ archive_content = sample_archive['data']
+ archive = InMemoryUploadedFile(
+ BytesIO(archive_content),
+ field_name=sample_archive['name'],
+ name=sample_archive['name'],
+ content_type='application/zip',
+ size=len(archive_content),
+ charset=None)
+
+ data_atom_entry_ko = atom_dataset['entry-data-ko']
+ atom_entry = InMemoryUploadedFile(
+ BytesIO(data_atom_entry_ko),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/atom+xml; charset="utf-8"',
+ size=len(data_atom_entry_ko),
+ charset='utf-8')
+
+ # when
+ response = authenticated_client.post(
+ url,
+ format='multipart',
+ data={
+ 'archive': archive,
+ 'atom_entry': atom_entry,
+ },
+ # + headers
+ HTTP_IN_PROGRESS='false',
+ HTTP_SLUG='external-id',
+ )
+
+ assert b'Malformed xml metadata' in response.content
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py
index 0d298477..4d41a1f2 100644
--- a/swh/deposit/tests/common.py
+++ b/swh/deposit/tests/common.py
@@ -1,568 +1,586 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import hashlib
import os
+import re
import shutil
import tarfile
import tempfile
from django.urls import reverse
from django.test import TestCase
from io import BytesIO
import pytest
from rest_framework import status
from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI,
DEPOSIT_STATUS_PARTIAL,
DEPOSIT_STATUS_VERIFIED,
DEPOSIT_STATUS_REJECTED,
DEPOSIT_STATUS_DEPOSITED)
from swh.deposit.models import DepositClient, DepositCollection, Deposit
from swh.deposit.models import DepositRequest
from swh.deposit.parsers import parse_xml
from swh.deposit.settings.testing import MEDIA_ROOT
from swh.core import tarball
def compute_info(archive_path):
"""Given a path, compute information on path.
"""
with open(archive_path, 'rb') as f:
length = 0
sha1sum = hashlib.sha1()
md5sum = hashlib.md5()
data = b''
for chunk in f:
sha1sum.update(chunk)
md5sum.update(chunk)
length += len(chunk)
data += chunk
return {
'dir': os.path.dirname(archive_path),
'name': os.path.basename(archive_path),
'path': archive_path,
'length': length,
'sha1sum': sha1sum.hexdigest(),
'md5sum': md5sum.hexdigest(),
'data': data
}
def _compress(path, extension, dir_path):
"""Compress path according to extension
"""
if extension == 'zip' or extension == 'tar':
return tarball.compress(path, extension, dir_path)
elif '.' in extension:
split_ext = extension.split('.')
if split_ext[0] != 'tar':
raise ValueError(
'Development error, only zip or tar archive supported, '
'%s not supported' % extension)
# deal with specific tar
mode = split_ext[1]
supported_mode = ['xz', 'gz', 'bz2']
if mode not in supported_mode:
raise ValueError(
'Development error, only %s supported, %s not supported' % (
supported_mode, mode))
files = tarball._ls(dir_path)
with tarfile.open(path, 'w:%s' % mode) as t:
for fpath, fname in files:
t.add(fpath, arcname=fname, recursive=False)
return path
def create_arborescence_archive(root_path, archive_name, filename, content,
up_to_size=None, extension='zip'):
"""Build an archive named archive_name in the root_path.
This archive contains one file named filename with the content content.
Args:
root_path (str): Location path of the archive to create
archive_name (str): Archive's name (without extension)
filename (str): Archive's content is only one filename
content (bytes): Content of the filename
up_to_size (int | None): Fill in the blanks size to oversize
or complete an archive's size
extension (str): Extension of the archive to write (default is zip)
Returns:
dict with the keys:
- dir: the directory of that archive
- path: full path to the archive
- sha1sum: archive's sha1sum
- length: archive's length
"""
os.makedirs(root_path, exist_ok=True)
archive_path_dir = tempfile.mkdtemp(dir=root_path)
dir_path = os.path.join(archive_path_dir, archive_name)
os.mkdir(dir_path)
filepath = os.path.join(dir_path, filename)
_length = len(content)
count = 0
batch_size = 128
with open(filepath, 'wb') as f:
f.write(content)
if up_to_size: # fill with blank content up to a given size
count += _length
while count < up_to_size:
f.write(b'0'*batch_size)
count += batch_size
_path = '%s.%s' % (dir_path, extension)
_path = _compress(_path, extension, dir_path)
return compute_info(_path)
def create_archive_with_archive(root_path, name, archive):
"""Create an archive holding another.
"""
invalid_archive_path = os.path.join(root_path, name)
with tarfile.open(invalid_archive_path, 'w:gz') as _archive:
_archive.add(archive['path'], arcname=archive['name'])
return compute_info(invalid_archive_path)
@pytest.mark.fs
class FileSystemCreationRoutine(TestCase):
"""Mixin intended for tests needed to tamper with archives.
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
self.root_path = '/tmp/swh-deposit/test/build-zip/'
os.makedirs(self.root_path, exist_ok=True)
self.archive = create_arborescence_archive(
self.root_path, 'archive1', 'file1', b'some content in file')
self.atom_entry = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr
"""
def tearDown(self):
super().tearDown()
shutil.rmtree(self.root_path)
def create_simple_binary_deposit(self, status_partial=True):
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/zip',
data=self.archive['data'],
CONTENT_LENGTH=self.archive['length'],
HTTP_MD5SUM=self.archive['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
if status_partial:
expected_status = DEPOSIT_STATUS_PARTIAL
else:
expected_status = DEPOSIT_STATUS_VERIFIED
self.assertEqual(_status, expected_status)
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_complex_binary_deposit(self, status_partial=False):
deposit_id = self.create_simple_binary_deposit(
status_partial=True)
# Add a second archive to the deposit
# update its status to DEPOSIT_STATUS_VERIFIED
response = self.client.post(
reverse(EM_IRI, args=[self.collection.name, deposit_id]),
content_type='application/zip',
data=self.archive2['data'],
CONTENT_LENGTH=self.archive2['length'],
HTTP_MD5SUM=self.archive2['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_archive_with_archive(self, archive_extension):
# we create the holding archive to a given extension
archive = create_arborescence_archive(
self.root_path, 'archive1', 'file1', b'some content in file',
extension=archive_extension)
# now we create an archive holding the first created archive
invalid_archive = create_archive_with_archive(
self.root_path, 'invalid.tar.gz', archive)
# we deposit it
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/x-tar',
data=invalid_archive['data'],
CONTENT_LENGTH=invalid_archive['length'],
HTTP_MD5SUM=invalid_archive['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=False,
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
invalid_archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED)
deposit_id = int(response_content['deposit_id'])
return deposit_id
def update_binary_deposit(self, deposit_id, status_partial=False):
# update existing deposit with atom entry metadata
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
if status_partial:
expected_status = DEPOSIT_STATUS_PARTIAL
else:
expected_status = DEPOSIT_STATUS_DEPOSITED
self.assertEqual(_status, expected_status)
deposit_id = int(response_content['deposit_id'])
return deposit_id
@pytest.mark.fs
class BasicTestCase(TestCase):
"""Mixin intended for data setup purposes (user, collection, etc...)
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
# expanding diffs in tests
self.maxDiff = None
# basic minimum test data
_name = 'hal'
_provider_url = 'https://hal-test.archives-ouvertes.fr/'
_domain = 'archives-ouvertes.fr/'
# set collection up
_collection = DepositCollection(name=_name)
_collection.save()
# set user/client up
_client = DepositClient.objects.create_user(username=_name,
password=_name,
provider_url=_provider_url,
domain=_domain)
_client.collections = [_collection.id]
_client.last_name = _name
_client.save()
self.collection = _collection
self.user = _client
self.username = _name
self.userpass = _name
def tearDown(self):
super().tearDown()
# Clean up uploaded files in temporary directory (tests have
# their own media root folder)
if os.path.exists(MEDIA_ROOT):
for d in os.listdir(MEDIA_ROOT):
shutil.rmtree(os.path.join(MEDIA_ROOT, d))
class WithAuthTestCase(TestCase):
"""Mixin intended for testing the api with basic authentication.
"""
def setUp(self):
super().setUp()
_token = '%s:%s' % (self.username, self.userpass)
token = base64.b64encode(_token.encode('utf-8'))
authorization = 'Basic %s' % token.decode('utf-8')
self.client.credentials(HTTP_AUTHORIZATION=authorization)
def tearDown(self):
super().tearDown()
self.client.credentials()
class CommonCreationRoutine(TestCase):
"""Mixin class to share initialization routine.
cf:
`class`:test_deposit_update.DepositReplaceExistingDataTest
`class`:test_deposit_update.DepositUpdateDepositWithNewDataTest
`class`:test_deposit_update.DepositUpdateFailuresTest
`class`:test_deposit_delete.DepositDeleteTest
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
some-external-id
https://hal-test.archives-ouvertes.fr/some-external-id
some awesome author
"""
self.atom_entry_data1 = b"""
another one
no one
2017-10-07T15:17:08Z
"""
self.atom_entry_data2 = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr/id
"""
self.codemeta_entry_data0 = b"""
Awesome Compiler
https://hal-test.archives-ouvertes.fr/1785io25c695
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
description
key-word 1
"""
self.codemeta_entry_data1 = b"""
Composing a Web of Audio Applications
hal
hal-01243065
hal-01243065
https://hal-test.archives-ouvertes.fr/hal-01243065
test
DSP programming,Web
2017-05-03T16:08:47+02:00
this is the description
1
phpstorm
stable
php
python
C
GNU General Public License v3.0 only
CeCILL Free Software License Agreement v1.1
HAL
hal@ccsd.cnrs.fr
Morane Gruenpeter
"""
def create_deposit_with_invalid_archive(self,
external_id='some-external-id-1'):
url = reverse(COL_IRI, args=[self.collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_with_status(
self, status,
external_id='some-external-id-1',
swh_id=None,
swh_id_context=None,
swh_anchor_id=None,
swh_anchor_id_context=None,
status_detail=None):
# create an invalid deposit which we will update further down the line
deposit_id = self.create_deposit_with_invalid_archive(external_id)
# We cannot create some form of deposit with a given status in
# test context ('rejected' for example). Update in place the
# deposit with such status to permit some further tests.
deposit = Deposit.objects.get(pk=deposit_id)
if status == DEPOSIT_STATUS_REJECTED:
deposit.status_detail = status_detail
deposit.status = status
if swh_id:
deposit.swh_id = swh_id
if swh_id_context:
deposit.swh_id_context = swh_id_context
if swh_anchor_id:
deposit.swh_anchor_id = swh_anchor_id
if swh_anchor_id_context:
deposit.swh_anchor_id_context = swh_anchor_id_context
deposit.save()
return deposit_id
def create_simple_deposit_partial(self, external_id='some-external-id'):
"""Create a simple deposit (1 request) in `partial` state and returns
its new identifier.
Returns:
deposit id
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_partial_with_data_in_args(self, data):
"""Create a simple deposit (1 request) in `partial` state with the data
or metadata as an argument and returns its new identifier.
Args:
data: atom entry
Returns:
deposit id
"""
if isinstance(data, str):
data = data.encode('utf-8')
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=data,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def _update_deposit_with_status(self, deposit_id, status_partial=False):
"""Add to a given deposit another archive and update its current
status to `deposited` (by default).
Returns:
deposit id
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
assert response.status_code == status.HTTP_201_CREATED
return deposit_id
def create_deposit_ready(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `deposited`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(deposit_id)
return deposit_id
def create_deposit_partial(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `partial`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(
deposit_id, status_partial=True)
return deposit_id
def add_metadata_to_deposit(self, deposit_id, status_partial=False):
"""Add metadata to deposit.
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
assert response.status_code == status.HTTP_201_CREATED
# then
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit is not None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert deposit_requests is not []
for dr in deposit_requests:
if dr.type == 'metadata':
assert deposit_requests[0].metadata is not {}
return deposit_id
+
+
+def check_archive(archive_name: str, archive_name_to_check: str):
+ """Helper function to ensure archive_name is present within the
+ archive_name_to_check.
+
+ Raises:
+ AssertionError if archive_name is not present within
+ archive_name_to_check
+
+ """
+ if '.' in archive_name:
+ filename, extension = archive_name.split('.')
+ pattern = re.compile('.*/%s.*\\.%s' % (filename, extension))
+ else:
+ pattern = re.compile('.*/%s' % archive_name)
+ assert pattern.match(archive_name_to_check) is not None
diff --git a/swh/deposit/tests/test_common.py b/swh/deposit/tests/test_common.py
new file mode 100644
index 00000000..588a4675
--- /dev/null
+++ b/swh/deposit/tests/test_common.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.deposit.tests.common import check_archive
+
+
+def test_check_archive_helper():
+ # success
+ for archive_name, archive_name_to_check in [
+ ('filename0', 'something/filename0'),
+ ('archive.zip', 'client_1/archive_noisynoise.zip'),
+ ]:
+ check_archive(archive_name, archive_name_to_check)
+
+ # failures
+ for archive_name, archive_name_to_check in [
+ ('filename0', 'something-filename0'),
+ ('archive.zip', 'client_1_archive_noisynoise.zip'),
+ ('reference', 'irrelevant'),
+ ]:
+ with pytest.raises(AssertionError):
+ check_archive(archive_name, archive_name_to_check)