diff --git a/swh/deposit/api/private/deposit_check.py b/swh/deposit/api/private/deposit_check.py index 7fa3cc7d..786fb4cc 100644 --- a/swh/deposit/api/private/deposit_check.py +++ b/swh/deposit/api/private/deposit_check.py @@ -1,174 +1,181 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import zipfile from rest_framework import status from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...config import DEPOSIT_STATUS_READY, DEPOSIT_STATUS_REJECTED from ...config import ARCHIVE_TYPE, METADATA_TYPE from ...models import Deposit, DepositRequest class SWHChecksDeposit(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def _deposit_requests(self, deposit, request_type): """Given a deposit, yields its associated deposit_request Args: deposit (Deposit): Deposit to list requests for request_type (str): Archive or metadata type Yields: deposit requests of type request_type associated to the deposit """ deposit_requests = DepositRequest.objects.filter( type=self.deposit_request_types[request_type], deposit=deposit).order_by('id') for deposit_request in deposit_requests: yield deposit_request def _check_deposit_archives(self, deposit): """Given a deposit, check each deposit request of type archive. Args: The deposit to check archives for Returns True if all archives are ok, False otherwise. """ requests = list(self._deposit_requests( deposit, request_type=ARCHIVE_TYPE)) if len(requests) == 0: # no associated archive is refused return False for dr in requests: check = self._check_archive(dr.archive) if not check: return False return True def _check_archive(self, archive): """Check that a given archive is actually ok for reading. Args: archive (File): Archive to check Returns: True if archive is successfully read, False otherwise. """ try: zf = zipfile.ZipFile(archive.path) zf.infolist() except Exception as e: return False else: return True - def _check_deposit_metadata(self, deposit): - """Given a deposit, check each deposit request of type metadata, - by aggregating all metadata requests one bundle. + def _metadata_get(self, deposit): + """Given a deposit, aggregate all metadata requests. Args: The deposit to check metadata for. Returns: True if the deposit's associated metadata are ok, False otherwise. """ metadata = {} for dr in self._deposit_requests(deposit, request_type=METADATA_TYPE): metadata.update(dr.metadata) - return self._check_metadata(metadata) + return metadata def _check_metadata(self, metadata): - """Check to execute on all metadata and keeps metadata_url for url validation. + """Check to execute on all metadata for mandatory field presence. Args: - metadata (): Metadata to actually check + metadata (dict): Metadata to actually check Returns: True if metadata is ok, False otherwise. """ required_fields = (('url',), ('external_identifier',), ('name', 'title'), ('author',)) result = all(any(name in field for field in metadata for name in possible_names) for possible_names in required_fields) - urls = [] + return result + + def _check_url(self, client_url, metadata): + """Check compatibility between client_url and url field in metadata + + Args: + client_url (str): url associated with the deposit's client + metadata (dict): Metadata where to find url + Returns: + True if url is ok, False otherwise. + + """ + metadata_urls = [] for field in metadata: if 'url' in field: - urls.append(metadata[field]) - self.metadata_url = urls - return result + metadata_urls.append(metadata[field]) - def _check_url(self, client_url, metadata_urls): - validatation = any(client_url in url - for url in metadata_urls) - return validatation + return any(client_url in url + for url in metadata_urls) def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ deposit = Deposit.objects.get(pk=deposit_id) client_url = deposit.client.url - self.metadata_url = None # created in _check_metadata + metadata = self._metadata_get(deposit) problems = [] # will check each deposit's associated request (both of type # archive and metadata) for errors archives_status = self._check_deposit_archives(deposit) if not archives_status: problems.append('archive(s)') - metadata_status = self._check_deposit_metadata(deposit) + metadata_status = self._check_metadata(metadata) if not metadata_status: problems.append('metadata') - url_status = self._check_url(client_url, self.metadata_url) + url_status = self._check_url(client_url, metadata) if not url_status: problems.append('url') deposit_status = archives_status and metadata_status and url_status # if any problems arose, the deposit is rejected if not deposit_status: deposit.status = DEPOSIT_STATUS_REJECTED else: deposit.status = DEPOSIT_STATUS_READY deposit.save() return (status.HTTP_200_OK, json.dumps({ 'status': deposit.status, 'details': 'Some %s failed the checks.' % ( ' and '.join(problems), ), }), 'application/json') diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 699c6ac2..1186cc0a 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,235 +1,235 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.core import tarball from swh.model import identifiers from ...config import SWH_PERSON from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...models import Deposit, DepositRequest @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive # from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) else: # only 1 archive, no need to do fancy actions (and no cleanup step) yield archive_paths[0] class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { 'extraction_dir': ('str', '/tmp/swh-deposit/archive/'), } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def retrieve_archives(self, deposit_id): """Given a deposit identifier, returns its associated archives' path. Yields: path to deposited archives """ deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id') for deposit_request in deposit_requests: yield deposit_request.archive.path def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = list(self.retrieve_archives(deposit_id)) with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse(open(path, 'rb'), status=status.HTTP_200_OK, content_type='application/octet-stream') class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView): """Class in charge of aggregating metadata on a deposit. """ ADDITIONAL_CONFIG = { 'provider': ('dict', { # 'provider_name': '', # those are not set since read from the # 'provider_url': '', # deposit's client 'provider_type': 'deposit_client', 'metadata': {} }), 'tool': ('dict', { 'name': 'swh-deposit', 'version': '0.0.1', 'configuration': { 'sword_version': '2' } }) } def __init__(self): super().__init__() self.provider = self.config['provider'] self.tool = self.config['tool'] def _aggregate_metadata(self, deposit, metadata_requests): """Retrieve and aggregates metadata information. """ metadata = {} for req in metadata_requests: metadata.update(req.metadata) return metadata def _retrieve_url(self, deposit, metadata): client_url = deposit.client.url for field in metadata: if 'url' in field: if client_url in metadata[field]: return metadata[field] def aggregate(self, deposit, requests): """Aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. requests ([DepositRequest]): List of associated requests which need aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ data = {} # Retrieve tarballs/metadata information metadata = self._aggregate_metadata(deposit, requests) # create origin_url from metadata only after deposit_check validates it origin_url = self._retrieve_url(deposit, metadata) # Read information metadata data['origin'] = { 'type': 'deposit', 'url': origin_url } # revision fullname = deposit.client.get_full_name() author_committer = SWH_PERSON # metadata provider self.provider['provider_name'] = deposit.client.last_name self.provider['provider_url'] = deposit.client.url revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) complete_date = identifiers.normalize_timestamp(deposit.complete_date) data['revision'] = { 'synthetic': True, 'date': complete_date, 'committer_date': complete_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } if deposit.parent: parent_revision = deposit.parent.swh_id data['revision']['parents'] = [parent_revision] data['occurrence'] = { 'branch': 'master' } data['origin_metadata'] = { 'provider': self.provider, 'tool': self.tool, 'metadata': metadata } return data def process_get(self, req, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) data = self.aggregate(deposit, requests) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, 'application/json' diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py index 445eb8a5..7ae63b34 100644 --- a/swh/deposit/tests/api/test_deposit_check.py +++ b/swh/deposit/tests/api/test_deposit_check.py @@ -1,147 +1,147 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from django.core.urlresolvers import reverse from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from ...models import Deposit from ...config import DEPOSIT_STATUS_READY, PRIVATE_CHECK_DEPOSIT from ...config import DEPOSIT_STATUS_READY_FOR_CHECKS, DEPOSIT_STATUS_REJECTED from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine from ...api.private.deposit_check import SWHChecksDeposit @attr('fs') class CheckDepositTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): """Check deposit endpoints. """ def setUp(self): super().setUp() @istest def deposit_ok(self): """Proper deposit should succeed the checks (-> status ready) """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_READY_FOR_CHECKS) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_READY) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_READY) @istest def deposit_ko(self): """Invalid deposit should fail the checks (-> status rejected) """ deposit_id = self.create_invalid_deposit() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_READY_FOR_CHECKS) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) self.assertEqual(data['details'], 'Some archive(s) and metadata and url ' + 'failed the checks.') deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) @istest def check_deposit_metadata_ok(self): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) self.assertEquals(deposit_id, deposit_id_metadata) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_READY_FOR_CHECKS) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = json.loads(response.content.decode('utf-8')) self.assertEqual(data['status'], DEPOSIT_STATUS_READY) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_READY) class CheckMetadata(unittest.TestCase, SWHChecksDeposit): @istest def check_metadata_ok(self): actual_check = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) self.assertTrue(actual_check) @istest def check_metadata_ok2(self): actual_check = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) self.assertTrue(actual_check) @istest def check_metadata_ko(self): actual_check = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) self.assertFalse(actual_check) @istest def check_metadata_ko2(self): actual_check = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) self.assertFalse(actual_check) diff --git a/swh/deposit/tests/api/test_deposit_read_metadata.py b/swh/deposit/tests/api/test_deposit_read_metadata.py index 4536c03d..657d61ed 100644 --- a/swh/deposit/tests/api/test_deposit_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_read_metadata.py @@ -1,214 +1,214 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.core.urlresolvers import reverse from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.config import DEPOSIT_STATUS_PARTIAL from ...config import SWH_PERSON from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Deposit access to read metadata information on deposit. """ @istest def read_metadata(self): """Private metadata read api to existing deposit should return metadata """ deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response._headers['content-type'][1], 'application/json') data = json.loads(response.content.decode('utf-8')) expected_meta = { 'origin': { 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id', 'type': 'deposit' }, 'origin_metadata': { 'metadata': { '{http://www.w3.org/2005/Atom}external_identifier': 'some-external-id', '{http://www.w3.org/2005/Atom}url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'provider': { 'provider_name': '', 'provider_type': 'deposit_client', 'provider_url': 'https://hal-test.archives-ouvertes.fr/', 'metadata': {} }, 'tool': { 'tool_name': 'swh-deposit', 'tool_version': '0.0.1', 'tool_configuration': { 'sword_version': '2' } } }, 'revision': { 'synthetic': True, 'committer_date': None, 'message': ': Deposit %s in collection hal' % deposit_id, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'date': None, 'metadata': { '{http://www.w3.org/2005/Atom}external_identifier': 'some-external-id', '{http://www.w3.org/2005/Atom}url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'type': 'tar' }, 'occurrence': { 'branch': 'master' } } self.assertEquals(data, expected_meta) @istest def read_metadata_revision_with_parent(self): """Private read metadata to a deposit (with parent) returns metadata """ swh_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa' deposit_id1 = self.create_deposit_with_status( status=DEPOSIT_STATUS_LOAD_SUCCESS, external_id='some-external-id', swh_id=swh_id) deposit_parent = Deposit.objects.get(pk=deposit_id1) self.assertEquals(deposit_parent.swh_id, swh_id) self.assertEquals(deposit_parent.external_id, 'some-external-id') self.assertEquals(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS) deposit_id = self.create_deposit_partial( external_id='some-external-id') deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.external_id, 'some-external-id') self.assertEquals(deposit.swh_id, None) self.assertEquals(deposit.parent, deposit_parent) self.assertEquals(deposit.status, DEPOSIT_STATUS_PARTIAL) url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response._headers['content-type'][1], 'application/json') data = json.loads(response.content.decode('utf-8')) expected_meta = { 'origin': { 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id', 'type': 'deposit' }, 'origin_metadata': { 'metadata': { '{http://www.w3.org/2005/Atom}external_identifier': 'some-external-id', '{http://www.w3.org/2005/Atom}url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'provider': { 'provider_name': '', 'provider_type': 'deposit_client', 'provider_url': 'https://hal-test.archives-ouvertes.fr/', 'metadata': {} }, 'tool': { 'tool_name': 'swh-deposit', 'tool_version': '0.0.1', 'tool_configuration': { 'sword_version': '2' } } }, 'revision': { 'synthetic': True, 'date': None, 'committer_date': None, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'type': 'tar', 'message': ': Deposit %s in collection hal' % deposit_id, 'metadata': { '{http://www.w3.org/2005/Atom}external_identifier': 'some-external-id', '{http://www.w3.org/2005/Atom}url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'parents': [swh_id] }, 'occurrence': { 'branch': 'master' } } self.assertEquals(data, expected_meta) @istest def access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) @istest def access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8'),) diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index d83ed977..d6493538 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,464 +1,464 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os import shutil import tempfile from django.core.urlresolvers import reverse from django.test import TestCase from io import BytesIO from nose.plugins.attrib import attr from rest_framework import status from swh.deposit.config import COL_IRI, EM_IRI, EDIT_SE_IRI from swh.deposit.models import DepositClient, DepositCollection, Deposit from swh.deposit.models import DepositRequest from swh.deposit.models import DepositRequestType from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.core import tarball def create_arborescence_zip(root_path, archive_name, filename, content, up_to_size=None): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) l = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += l while count < up_to_size: f.write(b'0'*batch_size) count += batch_size zip_path = dir_path + '.zip' zip_path = tarball.compress(zip_path, 'zip', dir_path) with open(zip_path, 'rb') as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b'' for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { 'dir': archive_path_dir, 'name': archive_name, 'data': data, 'path': zip_path, 'sha1sum': sha1sum.hexdigest(), 'md5sum': md5sum.hexdigest(), 'length': length, } @attr('fs') class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) self.archive = create_arborescence_zip( self.root_path, 'archive1', 'file1', b'some content in file') self.atom_entry = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr """ def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=True): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_READY response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def update_binary_deposit(self, deposit_id, status_partial=False): # update existing deposit with atom entry metadata response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then # assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id @attr('fs') class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data deposit_request_types = {} # Add deposit request types for deposit_request_type in ['archive', 'metadata']: drt = DepositRequestType(name=deposit_request_type) drt.save() deposit_request_types[deposit_request_type] = drt _name = 'hal' _url = 'https://hal-test.archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, password=_name, url=_url) _client.collections = [_collection.id] _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name self.deposit_request_types = deposit_request_types def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" some-external-id https://hal-test.archives-ouvertes.fr/some-external-id """ self.atom_entry_data1 = b""" anotherthing https://hal-test.archives-ouvertes.fr/anotherthing """ self.atom_entry_data2 = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr/id """ self.codemeta_entry_data0 = b""" Awesome Compiler https://hal-test.archives-ouvertes.fr/1785io25c695 urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author description key-word 1 """ self.codemeta_entry_data1 = b""" Composing a Web of Audio Applications hal hal-01243065 hal-01243065 https://hal-test.archives-ouvertes.fr/hal-01243065 test DSP programming,Web 2017-05-03T16:08:47+02:00 this is the description 1 phpstorm stable php python C GNU General Public License v3.0 only CeCILL Free Software License Agreement v1.1 HAL hal@ccsd.cnrs.fr Morane Gruenpeter """ def create_invalid_deposit(self, external_id='some-external-id-1'): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def create_deposit_with_status( self, status, external_id='some-external-id-1', swh_id=None): deposit_id = self.create_invalid_deposit(external_id) # We cannot create some form of deposit with a given status in # test context ('rejected' for example). As flipped off the # checks in the configuration so all deposits have the status # ready-for-checks). Update in place the deposit with such # status deposit = Deposit.objects.get(pk=deposit_id) deposit.status = status if swh_id: deposit.swh_id = swh_id deposit.save() return deposit_id def create_simple_deposit_partial(self, external_id='some-external-id'): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def create_deposit_partial_with_data_in_args(self, data): """Create a simple deposit (1 request) in `partial` state with the data or metadata as an argument and returns its new identifier. Args: data: atom entry Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `ready-for-checks` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `ready-for-checks`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id def add_metadata_to_deposit(self, deposit_id, status_partial=False): """Add metadata to deposit. """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) assert response.status_code == status.HTTP_201_CREATED # then deposit = Deposit.objects.get(pk=deposit_id) assert deposit is not None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert deposit_requests is not [] for dr in deposit_requests: if dr.type.name == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id