diff --git a/swh/deposit/api/private/deposit_check.py b/swh/deposit/api/private/deposit_check.py index 19f37747..d8d56334 100644 --- a/swh/deposit/api/private/deposit_check.py +++ b/swh/deposit/api/private/deposit_check.py @@ -1,238 +1,246 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re import tarfile import zipfile from rest_framework import status + from . import DepositReadMixin from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...config import DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED from ...config import ARCHIVE_TYPE from ...models import Deposit MANDATORY_FIELDS_MISSING = 'Mandatory fields are missing' ALTERNATE_FIELDS_MISSING = 'Mandatory alternate fields are missing' INCOMPATIBLE_URL_FIELDS = "At least one url field must be compatible with the client's domain name" # noqa MANDATORY_ARCHIVE_UNREADABLE = 'At least one of its associated archives is not readable' # noqa MANDATORY_ARCHIVE_INVALID = 'Mandatory archive is invalid (i.e contains only one archive)' # noqa MANDATORY_ARCHIVE_UNSUPPORTED = 'Mandatory archive type is not supported' MANDATORY_ARCHIVE_MISSING = 'Deposit without archive is rejected' +ARCHIVE_EXTENSIONS = [ + 'zip', 'tar', 'tar.gz', 'xz', 'tar.xz', 'bz2', + 'tar.bz2', 'Z', 'tar.Z', 'tgz', '7z' +] + +PATTERN_ARCHIVE_EXTENSION = re.compile( + r'.*\.(%s)$' % '|'.join(ARCHIVE_EXTENSIONS)) + class SWHChecksDeposit(SWHGetDepositAPI, SWHPrivateAPIView, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def _check_deposit_archives(self, deposit): """Given a deposit, check each deposit request of type archive. Args: The deposit to check archives for Returns tuple (status, error_detail): True, None if all archives are ok, (False, ) otherwise. """ requests = list(self._deposit_requests( deposit, request_type=ARCHIVE_TYPE)) if len(requests) == 0: # no associated archive is refused return False, { 'archive': [{ 'summary': MANDATORY_ARCHIVE_MISSING, }] } errors = [] for archive_request in requests: check, error_message = self._check_archive(archive_request) if not check: errors.append({ 'summary': error_message, 'fields': [archive_request.id] }) if not errors: return True, None return False, { 'archive': errors } def _check_archive(self, archive_request): """Check that a deposit associated archive is ok: - readable - supported archive format - content of the archive is not a single archive If any of those checks are not ok, return the corresponding failing check. Args: archive_path (DepositRequest): Archive to check Returns: (True, None) if archive is check compliant, (False, ) otherwise. """ archive_path = archive_request.archive.path try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path) as f: files = f.namelist() elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path) as f: files = f.getnames() else: return False, MANDATORY_ARCHIVE_UNSUPPORTED except Exception: return False, MANDATORY_ARCHIVE_UNREADABLE if len(files) > 1: return True, None element = files[0] - pattern = re.compile( - r'.*\.(zip|tar|tar.gz|.xz|tar.xz|Z|.tar.Z|bz2|tar.bz2)$') - if pattern.match(element): # invalid archive in archive + if PATTERN_ARCHIVE_EXTENSION.match(element): + # archive in archive! return False, MANDATORY_ARCHIVE_INVALID return True, None def _check_metadata(self, metadata): """Check to execute on all metadata for mandatory field presence. Args: metadata (dict): Metadata dictionary to check for mandatory fields Returns: tuple (status, error_detail): True, None if metadata are ok (False, ) otherwise. """ required_fields = { 'url': False, 'external_identifier': False, 'author': False, } alternate_fields = { ('name', 'title'): False, # alternate field, at least one - # of them must be present + # of them must be present } for field, value in metadata.items(): for name in required_fields: if name in field: required_fields[name] = True for possible_names in alternate_fields: for possible_name in possible_names: if possible_name in field: alternate_fields[possible_names] = True continue mandatory_result = [k for k, v in required_fields.items() if not v] optional_result = [ ' or '.join(k) for k, v in alternate_fields.items() if not v] if mandatory_result == [] and optional_result == []: return True, None detail = [] if mandatory_result != []: detail.append({ 'summary': MANDATORY_FIELDS_MISSING, 'fields': mandatory_result }) if optional_result != []: detail.append({ 'summary': ALTERNATE_FIELDS_MISSING, 'fields': optional_result, }) return False, { 'metadata': detail } def _check_url(self, client_domain, metadata): """Check compatibility between client_domain and url field in metadata Args: client_domain (str): url associated with the deposit's client metadata (dict): Metadata where to find url Returns: tuple (status, error_detail): True, None if url associated with the deposit's client is ok, (False, ) otherwise. """ url_fields = [] for field in metadata: if 'url' in field: if client_domain in metadata[field]: return True, None url_fields.append(field) detail = { 'url': { 'summary': INCOMPATIBLE_URL_FIELDS, } } if url_fields: detail['url']['fields'] = url_fields return False, detail def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ deposit = Deposit.objects.get(pk=deposit_id) client_domain = deposit.client.domain metadata = self._metadata_get(deposit) problems = {} # will check each deposit's associated request (both of type # archive and metadata) for errors archives_status, error_detail = self._check_deposit_archives(deposit) if not archives_status: problems.update(error_detail) metadata_status, error_detail = self._check_metadata(metadata) if not metadata_status: problems.update(error_detail) url_status, error_detail = self._check_url(client_domain, metadata) if not url_status: problems.update(error_detail) deposit_status = archives_status and metadata_status and url_status # if any problems arose, the deposit is rejected if not deposit_status: deposit.status = DEPOSIT_STATUS_REJECTED deposit.status_detail = problems response = { 'status': deposit.status, 'details': deposit.status_detail, } else: deposit.status = DEPOSIT_STATUS_VERIFIED response = { 'status': deposit.status, } deposit.save() return status.HTTP_200_OK, json.dumps(response), 'application/json' diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py index e28ba807..469f8e6e 100644 --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -1,652 +1,652 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.files.uploadedfile import InMemoryUploadedFile from django.core.urlresolvers import reverse from io import BytesIO from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import COL_IRI, EM_IRI from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase, create_arborescence_zip +from ..common import BasicTestCase, WithAuthTestCase, create_arborescence_archive from ..common import FileSystemCreationRoutine class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine): """Try and upload one single deposit """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" Awesome Compiler hal urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a %s 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data1 = b""" hal urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data2 = b""" %s """ self.atom_entry_data_empty_body = b""" """ self.atom_entry_data3 = b""" something """ self.data_atom_entry_ok = b""" Title urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 2005-10-07T17:17:08Z Contributor The abstract The abstract Access Rights Alternative Title Date Available Bibliographic Citation # noqa Contributor Description Has Part Has Version Identifier Is Part Of Publisher References Rights Holder Source Title Type """ @istest def post_deposit_binary_without_slug_header_is_bad_request(self): """Posting a binary deposit without slug header should return 400 """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertIn(b'Missing SLUG header', response.content) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @istest def post_deposit_binary_upload_final_and_status_check(self): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then response_content = parse_xml(BytesIO(response.content)) self.assertEqual(response.status_code, status.HTTP_201_CREATED) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertRegex(deposit_request.archive.name, self.archive['name']) response_content = parse_xml(BytesIO(response.content)) self.assertEqual(response_content['deposit_archive'], self.archive['name']) self.assertEqual(int(response_content['deposit_id']), deposit.id) self.assertEqual(response_content['deposit_status'], deposit.status) edit_se_iri = reverse('edit_se_iri', args=[self.collection.name, deposit.id]) self.assertEqual(response._headers['location'], ('Location', 'http://testserver' + edit_se_iri)) @istest def post_deposit_binary_upload_supports_zip_or_tar(self): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/octet-stream', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) @istest def post_deposit_binary_fails_if_unsupported_packaging_header( self): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='something-unsupported', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) @istest def post_deposit_binary_upload_fail_if_no_content_disposition_header( self): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) @istest def post_deposit_mediation_not_supported(self): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_ON_BEHALF_OF='someone', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_412_PRECONDITION_FAILED) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) @istest def post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( self): """Binary upload must not exceed the limit set up... """ # given url = reverse(COL_IRI, args=[self.collection.name]) - archive = create_arborescence_zip( + archive = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some content in file', up_to_size=TEST_CONFIG['max_upload_size']) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=archive['data'], # + headers CONTENT_LENGTH=archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_413_REQUEST_ENTITY_TOO_LARGE) self.assertRegex(response.content, b'Upload size limit exceeded') with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) @istest def post_deposit_2_post_2_different_deposits(self): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG='some-external-id-1', HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() self.assertEqual(len(deposits), 1) self.assertEqual(deposits[0], deposit) # second post response = self.client.post( url, content_type='application/x-tar', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG='another-external-id', HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content['deposit_id'] deposit2 = Deposit.objects.get(pk=deposit_id2) self.assertNotEqual(deposit, deposit2) deposits = Deposit.objects.all().order_by('id') self.assertEqual(len(deposits), 2) self.assertEqual(list(deposits), [deposit, deposit2]) @istest def post_deposit_binary_and_post_to_add_another_archive(self): """Updating a deposit should return a 201 with receipt """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='true', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, 'partial') self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertEquals(deposit_request.type.name, 'archive') self.assertRegex(deposit_request.archive.name, self.archive['name']) # 2nd archive to upload - archive2 = create_arborescence_zip( + archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') # uri to update the content update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = self.client.post( update_uri, content_type='application/zip', # as zip data=archive2['data'], # + headers CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( archive2['name'])) self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). order_by('id')) # 2 deposit requests for the same deposit self.assertEquals(len(deposit_requests), 2) self.assertEquals(deposit_requests[0].deposit, deposit) self.assertEquals(deposit_requests[0].type.name, 'archive') self.assertRegex(deposit_requests[0].archive.name, self.archive['name']) self.assertEquals(deposit_requests[1].deposit, deposit) self.assertEquals(deposit_requests[1].type.name, 'archive') self.assertRegex(deposit_requests[1].archive.name, archive2['name']) # only 1 deposit in db deposits = Deposit.objects.all() self.assertEqual(len(deposits), 1) @istest def post_deposit_then_post_or_put_is_refused_when_status_ready(self): """Updating a deposit with status 'ready' should return a 400 """ url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertRegex(deposit_request.archive.name, 'filename0') # updating/adding is forbidden # uri to update the content edit_se_iri = reverse( 'edit_se_iri', args=[self.collection.name, deposit_id]) em_iri = reverse( 'em_iri', args=[self.collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready - archive2 = create_arborescence_zip( + archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some content in file 2') # replacing file is no longer possible since the deposit's # status is ready r = self.client.put( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding file is no longer possible since the deposit's status # is ready r = self.client.post( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # replacing metadata is no longer possible since the deposit's # status is ready r = self.client.put( edit_se_iri, content_type='application/atom+xml;type=entry', data=self.data_atom_entry_ok, CONTENT_LENGTH=len(self.data_atom_entry_ok), HTTP_SLUG=external_id) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding new metadata is no longer possible since the # deposit's status is ready r = self.client.post( edit_se_iri, content_type='application/atom+xml;type=entry', data=self.data_atom_entry_ok, CONTENT_LENGTH=len(self.data_atom_entry_ok), HTTP_SLUG=external_id) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) archive_content = b'some content representing archive' archive = InMemoryUploadedFile( BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/zip', size=len(archive_content), charset=None) atom_entry = InMemoryUploadedFile( BytesIO(self.data_atom_entry_ok), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(self.data_atom_entry_ok), charset='utf-8') # replacing multipart metadata is no longer possible since the # deposit's status is ready r = self.client.put( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding new metadata is no longer possible since the # deposit's status is ready r = self.client.post( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py index a3fd179f..e2d15831 100644 --- a/swh/deposit/tests/api/test_deposit_check.py +++ b/swh/deposit/tests/api/test_deposit_check.py @@ -1,256 +1,247 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from django.core.urlresolvers import reverse from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED ) from swh.deposit.api.private.deposit_check import ( SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine @attr('fs') class CheckDepositTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): """Check deposit endpoints. """ def setUp(self): super().setUp() @istest def deposit_ok(self): """Proper deposit should succeed the checks (-> status ready) """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) @istest def deposit_invalid_tarball(self): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ - deposit_id = self.create_deposit_archive_with_archive() + for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: + deposit_id = self.create_deposit_archive_with_archive( + archive_extension) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) + deposit = Deposit.objects.get(pk=deposit_id) + self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) - url = reverse(PRIVATE_CHECK_DEPOSIT, - args=[self.collection.name, deposit.id]) + url = reverse(PRIVATE_CHECK_DEPOSIT, + args=[self.collection.name, deposit.id]) - response = self.client.get(url) + response = self.client.get(url) - self.assertEqual(response.status_code, status.HTTP_200_OK) - data = json.loads(response.content.decode('utf-8')) - self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) - details = data['details'] - # archive checks failure - self.assertEqual(len(details['archive']), 1) - self.assertEqual(details['archive'][0]['summary'], - MANDATORY_ARCHIVE_INVALID) - # metadata check failure - self.assertEqual(len(details['metadata']), 2) - mandatory = details['metadata'][0] - self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) - self.assertEqual(set(mandatory['fields']), - set(['url', 'external_identifier', 'author'])) - alternate = details['metadata'][1] - self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) - self.assertEqual(alternate['fields'], ['name or title']) - # url check failure - self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = json.loads(response.content.decode('utf-8')) + self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) + details = data['details'] + # archive checks failure + self.assertEqual(len(details['archive']), 1) + self.assertEqual(details['archive'][0]['summary'], + MANDATORY_ARCHIVE_INVALID) - deposit = Deposit.objects.get(pk=deposit.id) - self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) + deposit = Deposit.objects.get(pk=deposit.id) + self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) @istest def deposit_ko_missing_tarball(self): """Deposit without archive should fail the checks: rejected """ deposit_id = self.create_deposit_ready() # no archive, only atom deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_MISSING) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) @istest def deposit_ko_unsupported_tarball(self): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit_id = self.create_deposit_with_invalid_archive() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_UNSUPPORTED) # metadata check failure self.assertEqual(len(details['metadata']), 2) mandatory = details['metadata'][0] self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) self.assertEqual(set(mandatory['fields']), set(['url', 'external_identifier', 'author'])) alternate = details['metadata'][1] self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) self.assertEqual(alternate['fields'], ['name or title']) # url check failure self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) @istest def check_deposit_metadata_ok(self): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) self.assertEquals(deposit_id, deposit_id_metadata) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) class CheckMetadata(unittest.TestCase, SWHChecksDeposit): @istest def check_metadata_ok(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) @istest def check_metadata_ok2(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) @istest def check_metadata_ko(self): """Missing optional field should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) @istest def check_metadata_ko2(self): """Missing mandatory fields should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py index b6284308..21c4bf90 100644 --- a/swh/deposit/tests/api/test_deposit_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_read_archive.py @@ -1,130 +1,130 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os from django.core.urlresolvers import reverse from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.core import tarball from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.tests import TEST_CONFIG from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ..common import FileSystemCreationRoutine, create_arborescence_zip +from ..common import FileSystemCreationRoutine, create_arborescence_archive @attr('fs') class DepositReadArchivesTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): def setUp(self): super().setUp() - self.archive2 = create_arborescence_zip( + self.archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') self.workdir = os.path.join(self.root_path, 'workdir') @istest def access_to_existing_deposit_with_one_archive(self): """Access to deposit should stream a 200 response with its raw content """ deposit_id = self.create_simple_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self.assertEquals(actual_sha1, self.archive['sha1sum']) # this does not touch the extraction dir so this should stay empty self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) def _check_tarball_consistency(self, actual_sha1): tarball.uncompress(self.archive['path'], self.workdir) self.assertEquals(os.listdir(self.workdir), ['file1']) tarball.uncompress(self.archive2['path'], self.workdir) lst = set(os.listdir(self.workdir)) self.assertEquals(lst, {'file1', 'file2'}) new_path = self.workdir + '.zip' tarball.compress(new_path, 'zip', self.workdir) with open(new_path, 'rb') as f: h = hashlib.sha1(f.read()).hexdigest() self.assertEqual(actual_sha1, h) self.assertNotEqual(actual_sha1, self.archive['sha1sum']) self.assertNotEqual(actual_sha1, self.archive2['sha1sum']) @istest def access_to_existing_deposit_with_multiple_archives(self): """Access to deposit should stream a 200 response with its raw contents """ deposit_id = self.create_complex_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self._check_tarball_consistency(actual_sha1) # this touches the extraction directory but should clean up # after itself self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): @istest def access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) @istest def access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8')) diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 3f227ecc..22478011 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,345 +1,345 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DepositRequest from swh.deposit.config import EDIT_SE_IRI, EM_IRI from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ..common import FileSystemCreationRoutine, create_arborescence_zip +from ..common import FileSystemCreationRoutine, create_arborescence_archive class DepositUpdateOrReplaceExistingDataTest( APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine, CommonCreationRoutine): """Try put/post (update/replace) query on EM_IRI """ def setUp(self): super().setUp() self.atom_entry_data1 = b""" bar """ self.atom_entry_data1 = b""" bar """ - self.archive2 = create_arborescence_zip( + self.archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') @istest def replace_archive_to_deposit_is_possible(self): """Replace all archive with another one should return a 204 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) assert len(list(requests)) == 1 assert self.archive['name'] in requests[0].archive.name # we have no metadata for that deposit requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 0 deposit_id = self._update_deposit_with_status(deposit_id, status_partial=True) requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) external_id = 'some-external-id-1' response = self.client.put( update_uri, content_type='application/zip', # as zip data=self.archive2['data'], # + headers CONTENT_LENGTH=self.archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive2['name'], )) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) self.assertEquals(len(list(requests)), 1) self.assertRegex(requests[0].archive.name, self.archive2['name']) # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) self.assertEquals(len(requests), 1) @istest def replace_metadata_to_deposit_is_possible(self): """Replace all metadata with another one should return a 204 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) assert len(list(requests)) == 0 requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) assert len(requests) == 1 update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.put( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data1) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) self.assertEquals(len(list(requests)), 1) metadata = requests[0].metadata self.assertEquals(metadata['foobar'], 'bar') # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) self.assertEquals(len(requests), 1) @istest def add_archive_to_deposit_is_possible(self): """Add another archive to a deposit return a 201 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) assert len(list(requests)) == 1 assert self.archive['name'] in requests[0].archive.name requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 0 update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) external_id = 'some-external-id-1' response = self.client.post( update_uri, content_type='application/zip', # as zip data=self.archive2['data'], # + headers CONTENT_LENGTH=self.archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive2['name'],)) self.assertEqual(response.status_code, status.HTTP_201_CREATED) requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id')) self.assertEquals(len(requests), 2) # first archive still exists self.assertRegex(requests[0].archive.name, self.archive['name']) # a new one was added self.assertRegex(requests[1].archive.name, self.archive2['name']) # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) self.assertEquals(len(requests), 0) @istest def add_metadata_to_deposit_is_possible(self): """Add metadata with another one should return a 204 response """ # given deposit_id = self.create_deposit_partial() deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) assert len(list(requests)) == 2 requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) assert len(requests) == 0 update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.post( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data1) self.assertEqual(response.status_code, status.HTTP_201_CREATED) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']).order_by('id') self.assertEquals(len(list(requests)), 3) # a new one was added self.assertEquals(requests[1].metadata['foobar'], 'bar') # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) self.assertEquals(len(requests), 0) class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Failure scenario about add/replace (post/put) query on deposit. """ @istest def add_metadata_to_unknown_collection(self): """Replacing metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=['test', 1000]) response = self.client.post( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Unknown collection name test') @istest def add_metadata_to_unknown_deposit(self): """Replacing metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 999]) response = self.client.post( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 999 does not exist') @istest def replace_metadata_to_unknown_deposit(self): """Adding metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 998]) response = self.client.put( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 998 does not exist') @istest def add_archive_to_unknown_deposit(self): """Adding metadata to unknown deposit should return a 404 response """ url = reverse(EM_IRI, args=[self.collection.name, 997]) response = self.client.post( url, content_type='application/zip', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 997 does not exist') @istest def replace_archive_to_unknown_deposit(self): """Replacing archive to unknown deposit should return a 404 response """ url = reverse(EM_IRI, args=[self.collection.name, 996]) response = self.client.put( url, content_type='application/zip', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 996 does not exist') @istest def post_metadata_to_em_iri_failure(self): """Update (POST) archive with wrong content type should return 400 """ deposit_id = self.create_deposit_partial() # only update on partial update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.post( update_uri, content_type='application/x-gtar-compressed', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertRegex(response.content.decode('utf-8'), 'Packaging format supported is restricted to ' 'application/zip, application/x-tar') @istest def put_metadata_to_em_iri_failure(self): """Update (PUT) archive with wrong content type should return 400 """ # given deposit_id = self.create_deposit_partial() # only update on partial # when update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.put( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertRegex(response.content.decode('utf-8'), 'Packaging format supported is restricted to ' 'application/zip, application/x-tar') diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index c15f442d..76a4f067 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,518 +1,562 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os import shutil import tarfile import tempfile from django.core.urlresolvers import reverse from django.test import TestCase from io import BytesIO from nose.plugins.attrib import attr from rest_framework import status from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_DEPOSITED) from swh.deposit.models import DepositClient, DepositCollection, Deposit from swh.deposit.models import DepositRequest from swh.deposit.models import DepositRequestType from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, 'rb') as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b'' for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { 'dir': os.path.dirname(archive_path), 'name': os.path.basename(archive_path), 'path': archive_path, 'length': length, 'sha1sum': sha1sum.hexdigest(), 'md5sum': md5sum.hexdigest(), 'data': data } -def create_arborescence_zip(root_path, archive_name, filename, content, - up_to_size=None): +def _compress(path, extension, dir_path): + """Compress path according to extension + + """ + if extension == 'zip' or extension == 'tar': + return tarball.compress(path, extension, dir_path) + elif '.' in extension: + split_ext = extension.split('.') + if split_ext[0] != 'tar': + raise ValueError( + 'Development error, only zip or tar archive supported, ' + '%s not supported' % extension) + + # deal with specific tar + mode = split_ext[1] + supported_mode = ['xz', 'gz', 'bz2'] + if mode not in supported_mode: + raise ValueError( + 'Development error, only %s supported, %s not supported' % ( + supported_mode, mode)) + files = tarball._ls(dir_path) + with tarfile.open(path, 'w:%s' % mode) as t: + for fpath, fname in files: + t.add(fpath, arcname=fname, recursive=False) + + return path + + +def create_arborescence_archive(root_path, archive_name, filename, content, + up_to_size=None, extension='zip'): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. + Args: + root_path (str): Location path of the archive to create + archive_name (str): Archive's name (without extension) + filename (str): Archive's content is only one filename + content (bytes): Content of the filename + up_to_size (int | None): Fill in the blanks size to oversize + or complete an archive's size + extension (str): Extension of the archive to write (default is zip) + Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b'0'*batch_size) count += batch_size - zip_path = dir_path + '.zip' - zip_path = tarball.compress(zip_path, 'zip', dir_path) - return compute_info(zip_path) + _path = '%s.%s' % (dir_path, extension) + _path = _compress(_path, extension, dir_path) + return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, 'w:gz') as _archive: _archive.add(archive['path'], arcname=archive['name']) return compute_info(invalid_archive_path) @attr('fs') class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) - self.archive = create_arborescence_zip( + self.archive = create_arborescence_archive( self.root_path, 'archive1', 'file1', b'some content in file') self.atom_entry = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr """ def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=True): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_VERIFIED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_VERIFIED response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id - def create_deposit_archive_with_archive(self): + def create_deposit_archive_with_archive(self, archive_extension): + # we create the holding archive to a given extension + archive = create_arborescence_archive( + self.root_path, 'archive1', 'file1', b'some content in file', + extension=archive_extension) + + # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive( - self.root_path, 'invalid.tar.gz', self.archive) + self.root_path, 'invalid.tar.gz', archive) + # we deposit it response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/x-tar', data=invalid_archive['data'], CONTENT_LENGTH=invalid_archive['length'], HTTP_MD5SUM=invalid_archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( invalid_archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED) deposit_id = int(response_content['deposit_id']) return deposit_id def update_binary_deposit(self, deposit_id, status_partial=False): # update existing deposit with atom entry metadata response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_DEPOSITED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id @attr('fs') class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data deposit_request_types = {} # Add deposit request types for deposit_request_type in ['archive', 'metadata']: drt = DepositRequestType(name=deposit_request_type) drt.save() deposit_request_types[deposit_request_type] = drt _name = 'hal' _provider_url = 'https://hal-test.archives-ouvertes.fr/' _domain = 'archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, password=_name, provider_url=_provider_url, domain=_domain) _client.collections = [_collection.id] _client.last_name = _name _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name self.deposit_request_types = deposit_request_types def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" some-external-id https://hal-test.archives-ouvertes.fr/some-external-id """ self.atom_entry_data1 = b""" some awesome author """ self.atom_entry_data2 = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr/id """ self.codemeta_entry_data0 = b""" Awesome Compiler https://hal-test.archives-ouvertes.fr/1785io25c695 urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author description key-word 1 """ self.codemeta_entry_data1 = b""" Composing a Web of Audio Applications hal hal-01243065 hal-01243065 https://hal-test.archives-ouvertes.fr/hal-01243065 test DSP programming,Web 2017-05-03T16:08:47+02:00 this is the description 1 phpstorm stable php python C GNU General Public License v3.0 only CeCILL Free Software License Agreement v1.1 HAL hal@ccsd.cnrs.fr Morane Gruenpeter """ def create_deposit_with_invalid_archive(self, external_id='some-external-id-1'): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_with_status( self, status, external_id='some-external-id-1', swh_id=None, status_detail=None): # create an invalid deposit which we will update further down the line deposit_id = self.create_deposit_with_invalid_archive(external_id) # We cannot create some form of deposit with a given status in # test context ('rejected' for example). Update in place the # deposit with such status to permit some further tests. deposit = Deposit.objects.get(pk=deposit_id) if status == DEPOSIT_STATUS_REJECTED: deposit.status_detail = status_detail deposit.status = status if swh_id: deposit.swh_id = swh_id deposit.save() return deposit_id def create_simple_deposit_partial(self, external_id='some-external-id'): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_partial_with_data_in_args(self, data): """Create a simple deposit (1 request) in `partial` state with the data or metadata as an argument and returns its new identifier. Args: data: atom entry Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `deposited` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `deposited`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id def add_metadata_to_deposit(self, deposit_id, status_partial=False): """Add metadata to deposit. """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) assert response.status_code == status.HTTP_201_CREATED # then deposit = Deposit.objects.get(pk=deposit_id) assert deposit is not None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert deposit_requests is not [] for dr in deposit_requests: if dr.type.name == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id