diff --git a/swh/deposit/api/private/deposit_check.py b/swh/deposit/api/private/deposit_check.py index ecfc2caa..eae73e38 100644 --- a/swh/deposit/api/private/deposit_check.py +++ b/swh/deposit/api/private/deposit_check.py @@ -1,239 +1,236 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re - +import tarfile +import zipfile from rest_framework import status -from tarfile import TarFile, is_tarfile -from zipfile import ZipFile, is_zipfile from . import DepositReadMixin from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...config import DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED from ...config import ARCHIVE_TYPE from ...models import Deposit MANDATORY_FIELDS_MISSING = 'Mandatory fields are missing' ALTERNATE_FIELDS_MISSING = 'Mandatory alternate fields are missing' MANDATORY_ARCHIVE_UNREADABLE = 'Deposit was rejected because at least one of its associated archives was not readable' # noqa MANDATORY_ARCHIVE_INVALID = 'Mandatory archive is invalid (e.g contains an archive)' # noqa MANDATORY_ARCHIVE_UNSUPPORTED = 'Mandatory archive type is not supported' MANDATORY_ARCHIVE_MISSING = 'Deposit without archive is rejected' INCOMPATIBLE_URL_FIELDS = "At least one url field must be compatible with the client's domain name" # noqa class SWHChecksDeposit(SWHGetDepositAPI, SWHPrivateAPIView, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def _check_deposit_archives(self, deposit): """Given a deposit, check each deposit request of type archive. Args: The deposit to check archives for Returns tuple (status, error_detail): True, None if all archives are ok, (False, ) otherwise. """ requests = list(self._deposit_requests( deposit, request_type=ARCHIVE_TYPE)) if len(requests) == 0: # no associated archive is refused return False, { 'archive': [{ 'summary': MANDATORY_ARCHIVE_MISSING, }] } errors = [] for archive_request in requests: check, error_message = self._check_archive(archive_request) if not check: errors.append({ 'summary': error_message, 'fields': archive_request.id }) if not errors: return True, None return False, { 'archive': errors } def _check_archive(self, archive_request): """Check that a given archive is actually ok: - reading ok - content of the archive at the first level is not only an archive. Args: archive_path (DepositRequest): Archive to check Returns: True if archive is check compliant, False otherwise. """ archive_path = archive_request.archive.path try: - if is_zipfile(archive_path): - with ZipFile(archive_path) as f: + if zipfile.is_zipfile(archive_path): + with zipfile.ZipFile(archive_path) as f: files = f.namelist() - elif is_tarfile(archive_path): - with TarFile(archive_path) as f: + elif tarfile.is_tarfile(archive_path): + with tarfile.open(archive_path) as f: files = f.getnames() else: return False, MANDATORY_ARCHIVE_UNSUPPORTED - - if len(files) > 1: - return True, None - - element = files[0] - pattern = re.compile( - r'.*\.(zip|tar|tar.gz|.xz|tar.xz|Z|.tar.Z|bz2|tar.bz2)$') - if pattern.match(element): # invalid archive in archive - return False, MANDATORY_ARCHIVE_INVALID except Exception: return False, MANDATORY_ARCHIVE_UNREADABLE + if len(files) > 1: + return True, None + element = files[0] + pattern = re.compile( + r'.*\.(zip|tar|tar.gz|.xz|tar.xz|Z|.tar.Z|bz2|tar.bz2)$') + if pattern.match(element): # invalid archive in archive + return False, MANDATORY_ARCHIVE_INVALID return True, None def _check_metadata(self, metadata): """Check to execute on all metadata for mandatory field presence. Args: metadata (dict): Metadata dictionary to check for mandatory fields Returns: tuple (status, error_detail): True, None if metadata are ok (False, ) otherwise. """ required_fields = { 'url': False, 'external_identifier': False, 'author': False, } alternate_fields = { ('name', 'title'): False, # alternate field, at least one # of them must be present } for field, value in metadata.items(): for name in required_fields: if name in field: required_fields[name] = True for possible_names in alternate_fields: for possible_name in possible_names: if possible_name in field: alternate_fields[possible_names] = True continue mandatory_result = [k for k, v in required_fields.items() if not v] optional_result = [ ' or '.join(k) for k, v in alternate_fields.items() if not v] if mandatory_result == [] and optional_result == []: return True, None detail = [] if mandatory_result != []: detail.append({ 'summary': MANDATORY_FIELDS_MISSING, 'fields': mandatory_result }) if optional_result != []: detail.append({ 'summary': ALTERNATE_FIELDS_MISSING, 'fields': optional_result, }) return False, { 'metadata': detail } def _check_url(self, client_domain, metadata): """Check compatibility between client_domain and url field in metadata Args: client_domain (str): url associated with the deposit's client metadata (dict): Metadata where to find url Returns: tuple (status, error_detail): True, None if url associated with the deposit's client is ok, (False, ) otherwise. """ url_fields = [] for field in metadata: if 'url' in field: if client_domain in metadata[field]: return True, None url_fields.append(field) detail = { 'url': { 'summary': INCOMPATIBLE_URL_FIELDS, } } if url_fields: detail['url']['fields'] = url_fields return False, detail def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ deposit = Deposit.objects.get(pk=deposit_id) client_domain = deposit.client.domain metadata = self._metadata_get(deposit) problems = {} # will check each deposit's associated request (both of type # archive and metadata) for errors archives_status, error_detail = self._check_deposit_archives(deposit) if not archives_status: problems.update(error_detail) metadata_status, error_detail = self._check_metadata(metadata) if not metadata_status: problems.update(error_detail) url_status, error_detail = self._check_url(client_domain, metadata) if not url_status: problems.update(error_detail) deposit_status = archives_status and metadata_status and url_status # if any problems arose, the deposit is rejected if not deposit_status: deposit.status = DEPOSIT_STATUS_REJECTED deposit.status_detail = problems response = { 'status': deposit.status, 'details': deposit.status_detail, } else: deposit.status = DEPOSIT_STATUS_VERIFIED response = { 'status': deposit.status, } deposit.save() return status.HTTP_200_OK, json.dumps(response), 'application/json' diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py index 6480e1a4..633a5dbe 100644 --- a/swh/deposit/tests/api/test_deposit_check.py +++ b/swh/deposit/tests/api/test_deposit_check.py @@ -1,192 +1,230 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from django.core.urlresolvers import reverse from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED ) from swh.deposit.api.private.deposit_check import ( - SWHChecksDeposit, + SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING ) from swh.deposit.models import Deposit from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine @attr('fs') class CheckDepositTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): """Check deposit endpoints. """ def setUp(self): super().setUp() @istest def deposit_ok(self): """Proper deposit should succeed the checks (-> status ready) """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) @istest - def deposit_ko(self): + def deposit_with_tarball_with_one_tarball_invalid(self): + """Deposit with tarball (of 1 tarball) should fail the checks: rejected + + """ + deposit_id = self.create_deposit_archive_with_archive() + + deposit = Deposit.objects.get(pk=deposit_id) + self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) + + url = reverse(PRIVATE_CHECK_DEPOSIT, + args=[self.collection.name, deposit.id]) + + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = json.loads(response.content.decode('utf-8')) + self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) + details = data['details'] + # archive checks failure + self.assertEqual(len(details['archive']), 1) + self.assertEqual(details['archive'][0]['summary'], + MANDATORY_ARCHIVE_INVALID) + # metadata check failure + self.assertEqual(len(details['metadata']), 2) + mandatory = details['metadata'][0] + self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) + self.assertEqual(set(mandatory['fields']), + set(['url', 'external_identifier', 'author'])) + alternate = details['metadata'][1] + self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) + self.assertEqual(alternate['fields'], ['name or title']) + # url check failure + self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) + + deposit = Deposit.objects.get(pk=deposit.id) + self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) + + @istest + def deposit_ko_not_a_valid_tarball(self): """Invalid deposit should fail the checks (-> status rejected) """ deposit_id = self.create_deposit_with_invalid_archive() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_UNSUPPORTED) # metadata check failure self.assertEqual(len(details['metadata']), 2) mandatory = details['metadata'][0] self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) self.assertEqual(set(mandatory['fields']), set(['url', 'external_identifier', 'author'])) alternate = details['metadata'][1] self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) self.assertEqual(alternate['fields'], ['name or title']) # url check failure self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) @istest def check_deposit_metadata_ok(self): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) self.assertEquals(deposit_id, deposit_id_metadata) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) class CheckMetadata(unittest.TestCase, SWHChecksDeposit): @istest def check_metadata_ok(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) @istest def check_metadata_ok2(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) @istest def check_metadata_ko(self): """Missing optional field should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) @istest def check_metadata_ko2(self): """Missing mandatory fields should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index 93d38ad0..c15f442d 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,478 +1,518 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os import shutil +import tarfile import tempfile from django.core.urlresolvers import reverse from django.test import TestCase from io import BytesIO from nose.plugins.attrib import attr from rest_framework import status from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_DEPOSITED) from swh.deposit.models import DepositClient, DepositCollection, Deposit from swh.deposit.models import DepositRequest from swh.deposit.models import DepositRequestType from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.core import tarball +def compute_info(archive_path): + """Given a path, compute information on path. + + """ + with open(archive_path, 'rb') as f: + length = 0 + sha1sum = hashlib.sha1() + md5sum = hashlib.md5() + data = b'' + for chunk in f: + sha1sum.update(chunk) + md5sum.update(chunk) + length += len(chunk) + data += chunk + + return { + 'dir': os.path.dirname(archive_path), + 'name': os.path.basename(archive_path), + 'path': archive_path, + 'length': length, + 'sha1sum': sha1sum.hexdigest(), + 'md5sum': md5sum.hexdigest(), + 'data': data + } + + def create_arborescence_zip(root_path, archive_name, filename, content, up_to_size=None): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b'0'*batch_size) count += batch_size zip_path = dir_path + '.zip' zip_path = tarball.compress(zip_path, 'zip', dir_path) + return compute_info(zip_path) - with open(zip_path, 'rb') as f: - length = 0 - sha1sum = hashlib.sha1() - md5sum = hashlib.md5() - data = b'' - for chunk in f: - sha1sum.update(chunk) - md5sum.update(chunk) - length += len(chunk) - data += chunk - return { - 'dir': archive_path_dir, - 'name': archive_name, - 'data': data, - 'path': zip_path, - 'sha1sum': sha1sum.hexdigest(), - 'md5sum': md5sum.hexdigest(), - 'length': length, - } +def create_archive_with_archive(root_path, name, archive): + """Create an archive holding another. + + """ + invalid_archive_path = os.path.join(root_path, name) + with tarfile.open(invalid_archive_path, 'w:gz') as _archive: + _archive.add(archive['path'], arcname=archive['name']) + return compute_info(invalid_archive_path) @attr('fs') class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) self.archive = create_arborescence_zip( self.root_path, 'archive1', 'file1', b'some content in file') self.atom_entry = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr """ def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=True): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_VERIFIED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_VERIFIED response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id + def create_deposit_archive_with_archive(self): + invalid_archive = create_archive_with_archive( + self.root_path, 'invalid.tar.gz', self.archive) + + response = self.client.post( + reverse(COL_IRI, args=[self.collection.name]), + content_type='application/x-tar', + data=invalid_archive['data'], + CONTENT_LENGTH=invalid_archive['length'], + HTTP_MD5SUM=invalid_archive['md5sum'], + HTTP_SLUG='external-id', + HTTP_IN_PROGRESS=False, + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + invalid_archive['name'], )) + + # then + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + response_content = parse_xml(BytesIO(response.content)) + _status = response_content['deposit_status'] + self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED) + deposit_id = int(response_content['deposit_id']) + return deposit_id + def update_binary_deposit(self, deposit_id, status_partial=False): # update existing deposit with atom entry metadata response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_DEPOSITED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id @attr('fs') class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data deposit_request_types = {} # Add deposit request types for deposit_request_type in ['archive', 'metadata']: drt = DepositRequestType(name=deposit_request_type) drt.save() deposit_request_types[deposit_request_type] = drt _name = 'hal' _provider_url = 'https://hal-test.archives-ouvertes.fr/' _domain = 'archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, password=_name, provider_url=_provider_url, domain=_domain) _client.collections = [_collection.id] _client.last_name = _name _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name self.deposit_request_types = deposit_request_types def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" some-external-id https://hal-test.archives-ouvertes.fr/some-external-id """ self.atom_entry_data1 = b""" some awesome author """ self.atom_entry_data2 = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr/id """ self.codemeta_entry_data0 = b""" Awesome Compiler https://hal-test.archives-ouvertes.fr/1785io25c695 urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author description key-word 1 """ self.codemeta_entry_data1 = b""" Composing a Web of Audio Applications hal hal-01243065 hal-01243065 https://hal-test.archives-ouvertes.fr/hal-01243065 test DSP programming,Web 2017-05-03T16:08:47+02:00 this is the description 1 phpstorm stable php python C GNU General Public License v3.0 only CeCILL Free Software License Agreement v1.1 HAL hal@ccsd.cnrs.fr Morane Gruenpeter """ def create_deposit_with_invalid_archive(self, external_id='some-external-id-1'): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_with_status( self, status, external_id='some-external-id-1', swh_id=None, status_detail=None): # create an invalid deposit which we will update further down the line deposit_id = self.create_deposit_with_invalid_archive(external_id) # We cannot create some form of deposit with a given status in # test context ('rejected' for example). Update in place the # deposit with such status to permit some further tests. deposit = Deposit.objects.get(pk=deposit_id) if status == DEPOSIT_STATUS_REJECTED: deposit.status_detail = status_detail deposit.status = status if swh_id: deposit.swh_id = swh_id deposit.save() return deposit_id def create_simple_deposit_partial(self, external_id='some-external-id'): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_partial_with_data_in_args(self, data): """Create a simple deposit (1 request) in `partial` state with the data or metadata as an argument and returns its new identifier. Args: data: atom entry Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `deposited` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `deposited`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id def add_metadata_to_deposit(self, deposit_id, status_partial=False): """Add metadata to deposit. """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) assert response.status_code == status.HTTP_201_CREATED # then deposit = Deposit.objects.get(pk=deposit_id) assert deposit is not None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert deposit_requests is not [] for dr in deposit_requests: if dr.type.name == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id