diff --git a/swh/deposit/tests/api/data/atom/entry-data-ko.xml b/swh/deposit/tests/api/data/atom/entry-data-ko.xml new file mode 100644 index 00000000..3f5d8802 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-ko.xml @@ -0,0 +1,6 @@ +<?xml version="1.0"?> +<entry xmlns="http://www.w3.org/2005/Atom" + xmlns:dcterms="http://purl.org/dc/terms/"> + <titleTitle</title> + <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> +</entry> diff --git a/swh/deposit/tests/api/data/atom/entry-update-in-place.xml b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml new file mode 100644 index 00000000..1a7d7bbb --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml @@ -0,0 +1,7 @@ +<?xml version="1.0"?> +<entry xmlns="http://www.w3.org/2005/Atom" + xmlns:dcterms="http://purl.org/dc/terms/"> + <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b</id> + <dcterms:title>Title</dcterms:title> + <dcterms:type>Type</dcterms:type> +</entry> diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py index d4f55c07..7d3eac5d 100644 --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -1,580 +1,544 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -import re from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from io import BytesIO from rest_framework import status from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import ( COL_IRI, EM_IRI, DEPOSIT_STATUS_DEPOSITED, ) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import create_arborescence_archive - - -def check_archive(archive_name: str, archive_name_to_check: str): - """Helper function to ensure archive_name is present within the - archive_name_to_check. - - Raises: - AssertionError if archive_name is not present within - archive_name_to_check - - """ - if '.' in archive_name: - filename, extension = archive_name.split('.') - pattern = re.compile('.*/%s.*\\.%s' % (filename, extension)) - else: - pattern = re.compile('.*/%s' % archive_name) - assert pattern.match(archive_name_to_check) is not None - - -def test_check_archive_helper(): - # success - for archive_name, archive_name_to_check in [ - ('filename0', 'something/filename0'), - ('archive.zip', 'client_1/archive_noisynoise.zip'), - ]: - check_archive(archive_name, archive_name_to_check) - - # failures - for archive_name, archive_name_to_check in [ - ('filename0', 'something-filename0'), - ('archive.zip', 'client_1_archive_noisynoise.zip'), - ('reference', 'irrelevant'), - ]: - with pytest.raises(AssertionError): - check_archive(archive_name, archive_name_to_check) +from swh.deposit.tests.common import create_arborescence_archive, check_archive def test_post_deposit_binary_no_slug( authenticated_client, deposit_collection, sample_archive): """Posting a binary deposit without slug header should return 400 """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert b'Missing SLUG header' in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_binary_support( authenticated_client, deposit_collection, sample_archive): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/octet-stream', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_ok( authenticated_client, deposit_collection, sample_archive): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( sample_archive['name'], )) # then response_content = parse_xml(BytesIO(response.content)) assert response.status_code == status.HTTP_201_CREATED deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) check_archive(sample_archive['name'], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None response_content = parse_xml(BytesIO(response.content)) assert response_content['deposit_archive'] == sample_archive['name'] assert int(response_content['deposit_id']) == deposit.id assert response_content['deposit_status'] == deposit.status edit_se_iri = reverse('edit_se_iri', args=[deposit_collection.name, deposit.id]) assert response._headers['location'] == ( 'Location', 'http://testserver' + edit_se_iri) def test_post_deposit_binary_failure_unsupported_packaging_header( authenticated_client, deposit_collection, sample_archive): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='something-unsupported', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_400_BAD_REQUEST with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_no_content_disposition_header( authenticated_client, deposit_collection, sample_archive): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false') # then assert response.status_code == status.HTTP_400_BAD_REQUEST with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_mediation_not_supported( authenticated_client, deposit_collection, sample_archive): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_ON_BEHALF_OF='someone', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_412_PRECONDITION_FAILED with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, sample_archive, tmp_path): """Binary upload must not exceed the limit set up... """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some content in file', up_to_size=TEST_CONFIG['max_upload_size']) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=archive['data'], # + headers CONTENT_LENGTH=archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b'Upload size limit exceeded' in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_2_post_2_different_deposits( authenticated_client, deposit_collection, sample_archive): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG='some-external-id-1', HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() assert len(deposits) == 1 assert deposits[0] == deposit # second post response = authenticated_client.post( url, content_type='application/x-tar', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG='another-external-id', HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content['deposit_id'] deposit2 = Deposit.objects.get(pk=deposit_id2) assert deposit != deposit2 deposits = Deposit.objects.all().order_by('id') assert len(deposits) == 2 assert list(deposits), [deposit == deposit2] def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='true', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( sample_archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == 'partial' assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == 'archive' check_archive(sample_archive['name'], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some other content in file') # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = authenticated_client.post( update_uri, content_type='application/zip', # as zip data=archive2['data'], # + headers CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( archive2['name'])) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). order_by('id')) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == 'archive' check_archive(sample_archive['name'], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == 'archive' check_archive(archive2['name'], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive('filename0', deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_se_iri = reverse( 'edit_se_iri', args=[deposit_collection.name, deposit_id]) em_iri = reverse( 'em_iri', args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some content in file 2') # replacing file is no longer possible since the deposit's # status is ready r = authenticated_client.put( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert r.status_code == status.HTTP_400_BAD_REQUEST # adding file is no longer possible since the deposit's status # is ready r = authenticated_client.post( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert r.status_code == status.HTTP_400_BAD_REQUEST # replacing metadata is no longer possible since the deposit's # status is ready r = authenticated_client.put( edit_se_iri, content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data-deposit-binary'], CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), HTTP_SLUG=external_id) assert r.status_code == status.HTTP_400_BAD_REQUEST # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data-deposit-binary'], CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), HTTP_SLUG=external_id) assert r.status_code == status.HTTP_400_BAD_REQUEST archive_content = b'some content representing archive' archive = InMemoryUploadedFile( BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/zip', size=len(archive_content), charset=None) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset['entry-data-deposit-binary']), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset['entry-data-deposit-binary']), charset='utf-8') # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) assert r.status_code == status.HTTP_400_BAD_REQUEST # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) assert r.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py index 05a03832..d9420f8d 100644 --- a/swh/deposit/tests/api/test_deposit_multipart.py +++ b/swh/deposit/tests/api/test_deposit_multipart.py @@ -1,448 +1,389 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase -from swh.deposit.config import COL_IRI -from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED +from swh.deposit.config import ( + COL_IRI, DEPOSIT_STATUS_DEPOSITED +) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase -from ..common import FileSystemCreationRoutine +from swh.deposit.tests.common import check_archive + + +def test_post_deposit_multipart_without_slug_header_is_bad_request( + authenticated_client, deposit_collection, atom_dataset): + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = b'some content representing archive' + archive = InMemoryUploadedFile( + BytesIO(archive_content), + field_name='archive0', + name='archive0', + content_type='application/zip', + size=len(archive_content), + charset=None) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false') + + assert b'Missing SLUG header' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_multipart_zip( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """one multipart deposit (zip+xml) should be accepted + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=sample_archive['length'], + charset=None) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + assert deposit_request.metadata is None + assert deposit_request.raw_metadata is None + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + +def test_post_deposit_multipart_tar( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """one multipart deposit (tar+xml) should be accepted -class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase, - FileSystemCreationRoutine): - """Post multipart deposit scenario + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + # from django.core.files import uploadedfile + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/x-tar', + size=sample_archive['length'], + charset=None) + + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + assert deposit_request.metadata is None + assert deposit_request.raw_metadata is None + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + +def test_post_deposit_multipart_put_to_replace_metadata( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """One multipart deposit followed by a metadata update should be + accepted """ - def setUp(self): - super().setUp() - - self.data_atom_entry_ok = b"""<?xml version="1.0"?> -<entry xmlns="http://www.w3.org/2005/Atom" - xmlns:dcterms="http://purl.org/dc/terms/"> - <title>Title</title> - <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> - <updated>2005-10-07T17:17:08Z</updated> - <author><name>Contributor</name></author> - <summary type="text">The abstract</summary> - - <!-- some embedded metadata --> - <dcterms:abstract>The abstract</dcterms:abstract> - <dcterms:accessRights>Access Rights</dcterms:accessRights> - <dcterms:alternative>Alternative Title</dcterms:alternative> - <dcterms:available>Date Available</dcterms:available> - <dcterms:bibliographicCitation>Bibliographic Citation</dcterms:bibliographicCitation> # noqa - <dcterms:contributor>Contributor</dcterms:contributor> - <dcterms:description>Description</dcterms:description> - <dcterms:hasPart>Has Part</dcterms:hasPart> - <dcterms:hasVersion>Has Version</dcterms:hasVersion> - <dcterms:identifier>Identifier</dcterms:identifier> - <dcterms:isPartOf>Is Part Of</dcterms:isPartOf> - <dcterms:publisher>Publisher</dcterms:publisher> - <dcterms:references>References</dcterms:references> - <dcterms:rightsHolder>Rights Holder</dcterms:rightsHolder> - <dcterms:source>Source</dcterms:source> - <dcterms:title>Title</dcterms:title> - <dcterms:type>Type</dcterms:type> - -</entry>""" - - self.data_atom_entry_update_in_place = """<?xml version="1.0"?> -<entry xmlns="http://www.w3.org/2005/Atom" - xmlns:dcterms="http://purl.org/dc/terms/"> - <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b</id> - <dcterms:title>Title</dcterms:title> - <dcterms:type>Type</dcterms:type> -</entry>""" - - def test_post_deposit_multipart_without_slug_header_is_bad_request(self): - # given - url = reverse(COL_IRI, args=[self.collection.name]) - data_atom_entry = self.data_atom_entry_ok - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/zip', - size=len(archive_content), - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false') - - self.assertIn(b'Missing SLUG header', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_multipart_zip(self): - """one multipart deposit (zip+xml) should be accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - # from django.core.files import uploadedfile - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/zip', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - self.assertIsNone(deposit_request.metadata) - self.assertIsNone(deposit_request.raw_metadata) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - def test_post_deposit_multipart_tar(self): - """one multipart deposit (tar+xml) should be accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - # from django.core.files import uploadedfile - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/x-tar', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - self.assertIsNone(deposit_request.metadata) - self.assertIsNone(deposit_request.raw_metadata) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - def test_post_deposit_multipart_put_to_replace_metadata(self): - """One multipart deposit followed by a metadata update should be - accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/zip', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='true', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, 'partial') - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - replace_metadata_uri = response._headers['location'][1] - response = self.client.put( - replace_metadata_uri, - content_type='application/atom+xml;type=entry', - data=self.data_atom_entry_update_in_place, - HTTP_IN_PROGRESS='false') - - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - - # deposit_id did not change - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b') - self.assertEqual( - deposit_request.raw_metadata, - self.data_atom_entry_update_in_place) - - # FAILURE scenarios - - def test_post_deposit_multipart_only_archive_and_atom_entry(self): - """Multipart deposit only accepts one archive and one atom+xml""" - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile(BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/x-tar', - size=len(archive_content), - charset=None) - - other_archive_content = b"some-other-content" - other_archive = InMemoryUploadedFile(BytesIO(other_archive_content), - field_name='atom0', - name='atom0', - content_type='application/x-tar', - size=len(other_archive_content), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': other_archive, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id') - - # then - self.assertEqual(response.status_code, - status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - self.assertTrue( - 'Only 1 application/zip (or application/x-tar) archive' in - response.content.decode('utf-8')) - - # when - archive.seek(0) - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id') - - # then - self.assertEqual(response.status_code, - status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - self.assertTrue( - 'You must provide both 1 application/zip (or ' - 'application/x-tar) and 1 atom+xml entry for ' - 'multipart deposit' in response.content.decode('utf-8') - ) - - def test_post_deposit_multipart_400_when_badly_formatted_xml(self): - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - data_atom_entry_ko = b"""<?xml version="1.0"?> -<entry xmlns="http://www.w3.org/2005/Atom" - xmlns:dcterms="http://purl.org/dc/terms/"> - <titleTitle</title> - <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> -</entry> -""" - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/zip', - size=len(archive_content), - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry_ko), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry_ko), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id', - ) - - self.assertIn(b'Malformed xml metadata', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=sample_archive['length'], + charset=None) + + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='true', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == 'partial' + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + replace_metadata_uri = response._headers['location'][1] + response = authenticated_client.put( + replace_metadata_uri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-deposit-binary'], + HTTP_IN_PROGRESS='false') + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # deposit_id did not change + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + atom_dataset['entry-data-deposit-binary'].decode('utf-8') + +# FAILURE scenarios + + +def test_post_deposit_multipart_only_archive_and_atom_entry( + authenticated_client, deposit_collection): + """Multipart deposit only accepts one archive and one atom+xml""" + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = b'some content representing archive' + archive = InMemoryUploadedFile(BytesIO(archive_content), + field_name='archive0', + name='archive0', + content_type='application/x-tar', + size=len(archive_content), + charset=None) + + other_archive_content = b"some-other-content" + other_archive = InMemoryUploadedFile(BytesIO(other_archive_content), + field_name='atom0', + name='atom0', + content_type='application/x-tar', + size=len(other_archive_content), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': other_archive, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id') + + # then + assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE + assert 'Only 1 application/zip (or application/x-tar) archive' in \ + response.content.decode('utf-8') + + # when + archive.seek(0) + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id') + + # then + assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE + assert ( + 'You must provide both 1 application/zip (or ' + 'application/x-tar) and 1 atom+xml entry for ' + 'multipart deposit' in response.content.decode('utf-8') + ) is True + + +def test_post_deposit_multipart_400_when_badly_formatted_xml( + authenticated_client, deposit_collection, + sample_archive, atom_dataset): + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = sample_archive['data'] + archive = InMemoryUploadedFile( + BytesIO(archive_content), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=len(archive_content), + charset=None) + + data_atom_entry_ko = atom_dataset['entry-data-ko'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry_ko), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry_ko), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id', + ) + + assert b'Malformed xml metadata' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index 0d298477..4d41a1f2 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,568 +1,586 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os +import re import shutil import tarfile import tempfile from django.urls import reverse from django.test import TestCase from io import BytesIO import pytest from rest_framework import status from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_DEPOSITED) from swh.deposit.models import DepositClient, DepositCollection, Deposit from swh.deposit.models import DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, 'rb') as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b'' for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { 'dir': os.path.dirname(archive_path), 'name': os.path.basename(archive_path), 'path': archive_path, 'length': length, 'sha1sum': sha1sum.hexdigest(), 'md5sum': md5sum.hexdigest(), 'data': data } def _compress(path, extension, dir_path): """Compress path according to extension """ if extension == 'zip' or extension == 'tar': return tarball.compress(path, extension, dir_path) elif '.' in extension: split_ext = extension.split('.') if split_ext[0] != 'tar': raise ValueError( 'Development error, only zip or tar archive supported, ' '%s not supported' % extension) # deal with specific tar mode = split_ext[1] supported_mode = ['xz', 'gz', 'bz2'] if mode not in supported_mode: raise ValueError( 'Development error, only %s supported, %s not supported' % ( supported_mode, mode)) files = tarball._ls(dir_path) with tarfile.open(path, 'w:%s' % mode) as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) return path def create_arborescence_archive(root_path, archive_name, filename, content, up_to_size=None, extension='zip'): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Args: root_path (str): Location path of the archive to create archive_name (str): Archive's name (without extension) filename (str): Archive's content is only one filename content (bytes): Content of the filename up_to_size (int | None): Fill in the blanks size to oversize or complete an archive's size extension (str): Extension of the archive to write (default is zip) Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b'0'*batch_size) count += batch_size _path = '%s.%s' % (dir_path, extension) _path = _compress(_path, extension, dir_path) return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, 'w:gz') as _archive: _archive.add(archive['path'], arcname=archive['name']) return compute_info(invalid_archive_path) @pytest.mark.fs class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) self.archive = create_arborescence_archive( self.root_path, 'archive1', 'file1', b'some content in file') self.atom_entry = b"""<?xml version="1.0"?> <entry xmlns="http://www.w3.org/2005/Atom"> <title>Awesome Compiler</title> <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> <external_identifier>1785io25c695</external_identifier> <updated>2017-10-07T15:17:08Z</updated> <author>some awesome author</author> <url>https://hal-test.archives-ouvertes.fr</url> </entry>""" def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=True): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_VERIFIED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_VERIFIED response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_archive_with_archive(self, archive_extension): # we create the holding archive to a given extension archive = create_arborescence_archive( self.root_path, 'archive1', 'file1', b'some content in file', extension=archive_extension) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive( self.root_path, 'invalid.tar.gz', archive) # we deposit it response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/x-tar', data=invalid_archive['data'], CONTENT_LENGTH=invalid_archive['length'], HTTP_MD5SUM=invalid_archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( invalid_archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED) deposit_id = int(response_content['deposit_id']) return deposit_id def update_binary_deposit(self, deposit_id, status_partial=False): # update existing deposit with atom entry metadata response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_DEPOSITED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id @pytest.mark.fs class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data _name = 'hal' _provider_url = 'https://hal-test.archives-ouvertes.fr/' _domain = 'archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, password=_name, provider_url=_provider_url, domain=_domain) _client.collections = [_collection.id] _client.last_name = _name _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b"""<?xml version="1.0"?> <entry xmlns="http://www.w3.org/2005/Atom"> <external_identifier>some-external-id</external_identifier> <url>https://hal-test.archives-ouvertes.fr/some-external-id</url> <author>some awesome author</author> </entry>""" self.atom_entry_data1 = b"""<?xml version="1.0"?> <entry xmlns="http://www.w3.org/2005/Atom"> <author>another one</author> <author>no one</author> <codemeta:dateCreated>2017-10-07T15:17:08Z</codemeta:dateCreated> </entry>""" self.atom_entry_data2 = b"""<?xml version="1.0"?> <entry xmlns="http://www.w3.org/2005/Atom"> <title>Awesome Compiler</title> <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> <external_identifier>1785io25c695</external_identifier> <updated>2017-10-07T15:17:08Z</updated> <author>some awesome author</author> <url>https://hal-test.archives-ouvertes.fr/id</url> </entry>""" self.codemeta_entry_data0 = b"""<?xml version="1.0"?> <entry xmlns="http://www.w3.org/2005/Atom" xmlns:codemeta="https://doi.org/10.5063/SCHEMA/CODEMETA-2.0"> <title>Awesome Compiler</title> <url>https://hal-test.archives-ouvertes.fr/1785io25c695</url> <id>urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a</id> <external_identifier>1785io25c695</external_identifier> <updated>2017-10-07T15:17:08Z</updated> <author>some awesome author</author> <codemeta:description>description</codemeta:description> <codemeta:keywords>key-word 1</codemeta:keywords> </entry>""" self.codemeta_entry_data1 = b"""<?xml version="1.0" encoding="utf-8"?> <entry xmlns="http://www.w3.org/2005/Atom" xmlns:codemeta="https://doi.org/10.5063/SCHEMA/CODEMETA-2.0"> <title>Composing a Web of Audio Applications</title> <client>hal</client> <id>hal-01243065</id> <external_identifier>hal-01243065</external_identifier> <codemeta:url>https://hal-test.archives-ouvertes.fr/hal-01243065</codemeta:url> <codemeta:applicationCategory>test</codemeta:applicationCategory> <codemeta:keywords>DSP programming,Web</codemeta:keywords> <codemeta:dateCreated>2017-05-03T16:08:47+02:00</codemeta:dateCreated> <codemeta:description>this is the description</codemeta:description> <codemeta:version>1</codemeta:version> <codemeta:runtimePlatform>phpstorm</codemeta:runtimePlatform> <codemeta:developmentStatus>stable</codemeta:developmentStatus> <codemeta:programmingLanguage>php</codemeta:programmingLanguage> <codemeta:programmingLanguage>python</codemeta:programmingLanguage> <codemeta:programmingLanguage>C</codemeta:programmingLanguage> <codemeta:license> <codemeta:name>GNU General Public License v3.0 only</codemeta:name> </codemeta:license> <codemeta:license> <codemeta:name>CeCILL Free Software License Agreement v1.1</codemeta:name> </codemeta:license> <author> <name>HAL</name> <email>hal@ccsd.cnrs.fr</email> </author> <codemeta:author> <codemeta:name>Morane Gruenpeter</codemeta:name> </codemeta:author> </entry>""" def create_deposit_with_invalid_archive(self, external_id='some-external-id-1'): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_with_status( self, status, external_id='some-external-id-1', swh_id=None, swh_id_context=None, swh_anchor_id=None, swh_anchor_id_context=None, status_detail=None): # create an invalid deposit which we will update further down the line deposit_id = self.create_deposit_with_invalid_archive(external_id) # We cannot create some form of deposit with a given status in # test context ('rejected' for example). Update in place the # deposit with such status to permit some further tests. deposit = Deposit.objects.get(pk=deposit_id) if status == DEPOSIT_STATUS_REJECTED: deposit.status_detail = status_detail deposit.status = status if swh_id: deposit.swh_id = swh_id if swh_id_context: deposit.swh_id_context = swh_id_context if swh_anchor_id: deposit.swh_anchor_id = swh_anchor_id if swh_anchor_id_context: deposit.swh_anchor_id_context = swh_anchor_id_context deposit.save() return deposit_id def create_simple_deposit_partial(self, external_id='some-external-id'): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_partial_with_data_in_args(self, data): """Create a simple deposit (1 request) in `partial` state with the data or metadata as an argument and returns its new identifier. Args: data: atom entry Returns: deposit id """ if isinstance(data, str): data = data.encode('utf-8') response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `deposited` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `deposited`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id def add_metadata_to_deposit(self, deposit_id, status_partial=False): """Add metadata to deposit. """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) assert response.status_code == status.HTTP_201_CREATED # then deposit = Deposit.objects.get(pk=deposit_id) assert deposit is not None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert deposit_requests is not [] for dr in deposit_requests: if dr.type == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id + + +def check_archive(archive_name: str, archive_name_to_check: str): + """Helper function to ensure archive_name is present within the + archive_name_to_check. + + Raises: + AssertionError if archive_name is not present within + archive_name_to_check + + """ + if '.' in archive_name: + filename, extension = archive_name.split('.') + pattern = re.compile('.*/%s.*\\.%s' % (filename, extension)) + else: + pattern = re.compile('.*/%s' % archive_name) + assert pattern.match(archive_name_to_check) is not None diff --git a/swh/deposit/tests/test_common.py b/swh/deposit/tests/test_common.py new file mode 100644 index 00000000..588a4675 --- /dev/null +++ b/swh/deposit/tests/test_common.py @@ -0,0 +1,26 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.deposit.tests.common import check_archive + + +def test_check_archive_helper(): + # success + for archive_name, archive_name_to_check in [ + ('filename0', 'something/filename0'), + ('archive.zip', 'client_1/archive_noisynoise.zip'), + ]: + check_archive(archive_name, archive_name_to_check) + + # failures + for archive_name, archive_name_to_check in [ + ('filename0', 'something-filename0'), + ('archive.zip', 'client_1_archive_noisynoise.zip'), + ('reference', 'irrelevant'), + ]: + with pytest.raises(AssertionError): + check_archive(archive_name, archive_name_to_check)