diff --git a/docs/index.rst b/docs/index.rst index be5f1ef7..5e7338f2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,21 +1,22 @@ .. _swh-deposit: Software Heritage Deposit ========================= .. toctree:: :maxdepth: 3 :caption: Contents: getting-started.md spec-api.md + metadata.md spec-injection.md dev-info.md sys-info.md Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 543de603..71baca87 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,204 +1,226 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.loader.tar import tarball from swh.model import hashutil, identifiers from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...models import Deposit, DepositRequest from ...models import previous_revision_id @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive # from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) else: # only 1 archive, no need to do fancy actions (and no cleanup step) yield archive_paths[0] class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { 'extraction_dir': ('str', '/tmp/swh-deposit/archive/') } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def retrieve_archives(self, deposit_id): """Given a deposit identifier, returns its associated archives' path. Yields: path to deposited archives """ deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id') for deposit_request in deposit_requests: yield deposit_request.archive.path def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = list(self.retrieve_archives(deposit_id)) with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse(open(path, 'rb'), status=status.HTTP_200_OK, content_type='application/octet-stream') class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView): """Class in charge of aggregating metadata on a deposit. """ def _aggregate_metadata(self, deposit, metadata_requests): """Retrieve and aggregates metadata information. """ metadata = {} for req in metadata_requests: metadata.update(req.metadata) return metadata def aggregate(self, deposit, requests): """Aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. requests ([DepositRequest]): List of associated requests which need aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ data = {} metadata_requests = [] # Retrieve tarballs/metadata information metadata = self._aggregate_metadata(deposit, metadata_requests) # Read information metadata data['origin'] = { - 'type': deposit.collection.name, - 'url': deposit.external_id, + 'type': 'deposit', + 'url': deposit.client.url + deposit.external_id, } # revision fullname = deposit.client.get_full_name() author_committer = { 'name': deposit.client.last_name, 'fullname': fullname, 'email': deposit.client.email, } revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) complete_date = identifiers.normalize_timestamp(deposit.complete_date) data['revision'] = { 'synthetic': True, 'date': complete_date, 'committer_date': complete_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } parent_revision = previous_revision_id(deposit.swh_id) if parent_revision: data['revision'] = { 'parents': [hashutil.hash_to_bytes(parent_revision)] } data['occurrence'] = { 'branch': 'master' } + provider = { + 'provider_name': deposit.client.last_name, + 'provider_type': 'deposit_client', + 'provider_url': deposit.client.url, + 'metadata': {} + } + + tool = { + 'tool_name': 'swh-deposit', + 'tool_version': '0.0.1', + 'tool_configuration': { + 'sword_version': '2' + } + } + + data['origin_metadata'] = { + 'provider': provider, + 'tool': tool, + 'metadata': metadata + } + + print(data) return data def process_get(self, req, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) data = self.aggregate(deposit, requests) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, 'application/json' diff --git a/swh/deposit/injection/loader.py b/swh/deposit/injection/loader.py index 9ee652b6..8f570140 100644 --- a/swh/deposit/injection/loader.py +++ b/swh/deposit/injection/loader.py @@ -1,177 +1,200 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import requests import tempfile from swh.model import hashutil from swh.loader.tar import loader from swh.loader.core.loader import SWHLoader class DepositClient: """Deposit client to read archive, metadata or update deposit's status. """ def read_archive_to(self, archive_update_url, archive_path, log=None): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive_path (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = requests.get(archive_update_url, stream=True) if r.ok: with open(archive_path, 'wb') as f: for chunk in r.iter_content(): f.write(chunk) return archive_path msg = 'Problem when retrieving deposit archive at %s' % ( archive_update_url, ) if log: log.error(msg) raise ValueError(msg) def read_metadata(self, metadata_url, log=None): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = requests.get(metadata_url) if r.ok: data = r.json() return data msg = 'Problem when retrieving metadata at %s' % metadata_url if log: log.error(msg) raise ValueError(msg) def update_status(self, update_status_url, status, revision_id=None): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to """ payload = {'status': status} if revision_id: payload['revision_id'] = revision_id requests.put(update_status_url, json=payload) class DepositLoader(loader.TarLoader): """Deposit loader implementation. This is a subclass of the :class:TarLoader as the main goal of this class is to first retrieve the deposit's tarball contents as one and its associated metadata. Then provide said tarball to be loaded by the TarLoader. This will: - retrieves the deposit's archive locally - provide the archive to be loaded by the tar loader - clean up the temporary location used to retrieve the archive locally - update the deposit's status accordingly """ def __init__(self, client=None): super().__init__() if client: self.client = client else: self.client = DepositClient() def load(self, *, archive_url, deposit_meta_url, deposit_update_url): SWHLoader.load( self, archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) def prepare(self, *, archive_url, deposit_meta_url, deposit_update_url): """Prepare the injection by first retrieving the deposit's raw archive content. """ self.deposit_update_url = deposit_update_url temporary_directory = tempfile.TemporaryDirectory() self.temporary_directory = temporary_directory archive_path = os.path.join(temporary_directory.name, 'archive.zip') archive = self.client.get_archive( archive_url, archive_path, log=self.log) metadata = self.client.get_metadata( deposit_meta_url, log=self.log) origin = metadata['origin'] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) revision = metadata['revision'] occurrence = metadata['occurrence'] + self.client.update_deposit_status(deposit_update_url, 'injecting') + super().prepare(tar_path=archive, origin=origin, visit_date=visit_date, revision=revision, occurrences=[occurrence]) + def store_metadata(self): + """Storing the origin_metadata during the load processus. + + Fetching tool and metadata_provider from storage and adding the + metadata associated to the current origin. + + """ + try: + tool_id = self.storage.indexer_configuration_get( + self.origin_metadata['tool']) + provider_id = self.storage.metadata_provider_get_by( + self.origin_metadata['provider']) + + self.storage.origin_metadata_add(self.origin_id, + self.visit_date, + provider_id, + tool_id, + self.origin_metadata['metadata']) + except: + self.log.exception('Problem when storing origin_metadata') + def post_load(self, success=True): """Updating the deposit's status according to its loading status. If not successful, we update its status to failure. Otherwise, we update its status to 'success' and pass along its associated revision. """ try: if not success: self.client.update_deposit_status(self.deposit_update_url, status='failure') return # first retrieve the new revision [rev_id] = self.objects['revision'].keys() if rev_id: rev_id_hex = hashutil.hash_to_hex(rev_id) # then update the deposit's status to success with its # revision-id self.client.update_deposit_status(self.deposit_update_url, status='success', revision_id=rev_id_hex) except: self.log.exception( 'Problem when trying to update the deposit\'s status') def cleanup(self): """Clean up temporary directory where we retrieved the tarball. """ super().cleanup() self.temporary_directory.cleanup() diff --git a/swh/deposit/migrations/0006_depositclient_url.py b/swh/deposit/migrations/0006_depositclient_url.py new file mode 100644 index 00000000..df96866f --- /dev/null +++ b/swh/deposit/migrations/0006_depositclient_url.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.7 on 2017-11-07 13:12 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('deposit', '0005_auto_20171019_1436'), + ] + + operations = [ + migrations.AddField( + model_name='depositclient', + name='url', + field=models.TextField(default='https://hal.archives-ouvertes.fr/'), + preserve_default=False, + ), + ] diff --git a/swh/deposit/models.py b/swh/deposit/models.py index def7d164..003794ec 100644 --- a/swh/deposit/models.py +++ b/swh/deposit/models.py @@ -1,231 +1,232 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Generated from: # cd swh_deposit && \ # python3 -m manage inspectdb from django.contrib.postgres.fields import JSONField, ArrayField from django.contrib.auth.models import User, UserManager from django.db import models from django.utils.timezone import now from .config import DEPOSIT_STATUS_READY, DEPOSIT_STATUS_READY_FOR_CHECKS from .config import DEPOSIT_STATUS_PARTIAL class Dbversion(models.Model): """Db version """ version = models.IntegerField(primary_key=True) release = models.DateTimeField(default=now, null=True) description = models.TextField(blank=True, null=True) class Meta: db_table = 'dbversion' def __str__(self): return str({ 'version': self.version, 'release': self.release, 'description': self.description }) """Possible status""" DEPOSIT_STATUS = [ (DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_PARTIAL), ('expired', 'expired'), (DEPOSIT_STATUS_READY_FOR_CHECKS, DEPOSIT_STATUS_READY_FOR_CHECKS), (DEPOSIT_STATUS_READY, DEPOSIT_STATUS_READY), ('rejected', 'rejected'), ('injecting', 'injecting'), ('success', 'success'), ('failure', 'failure'), ] """Possible status and the detailed meaning.""" DEPOSIT_STATUS_DETAIL = { DEPOSIT_STATUS_PARTIAL: 'Deposit is new or partially received since it can' ' be done in multiple requests', 'expired': 'Deposit has been there too long and is now ' 'deemed ready to be garbage collected', DEPOSIT_STATUS_READY_FOR_CHECKS: 'Deposit is ready for additional checks ' '(tarball ok, etc...)', DEPOSIT_STATUS_READY: 'Deposit is fully received, checked, and ' 'ready for injection', 'rejected': 'Deposit failed the checks', 'injecting': "Injection is ongoing on swh's side", 'success': 'Injection is successful', 'failure': 'Injection is a failure', } class DepositClient(User): """Deposit client """ collections = ArrayField(models.IntegerField(), null=True) objects = UserManager() + url = models.TextField(null=False) class Meta: db_table = 'deposit_client' def __str__(self): return str({ 'id': self.id, 'collections': self.collections, 'username': super().username, }) def format_swh_id(collection_name, revision_id): """Format swh_id value before storing in swh-deposit backend. Args: collection_name (str): the collection's name revision_id (str): the revision's hash identifier Returns: The identifier as string """ return 'swh-%s-%s' % (collection_name, revision_id) def previous_revision_id(swh_id): """Compute the parent's revision id (if any) from the swh_id. Args: swh_id (id): SWH Identifier from a previous deposit. Returns: None if no parent revision is detected. The revision id's hash if any. """ if swh_id: return swh_id.split('-')[2] return None class Deposit(models.Model): """Deposit reception table """ id = models.BigAutoField(primary_key=True) # First deposit reception date reception_date = models.DateTimeField(auto_now_add=True) # Date when the deposit is deemed complete and ready for injection complete_date = models.DateTimeField(null=True) # collection concerned by the deposit collection = models.ForeignKey( 'DepositCollection', models.DO_NOTHING) # Deposit's external identifier external_id = models.TextField() # Deposit client client = models.ForeignKey('DepositClient', models.DO_NOTHING) # SWH's injection result identifier swh_id = models.TextField(blank=True, null=True) # Deposit's status regarding injection status = models.TextField( choices=DEPOSIT_STATUS, default=DEPOSIT_STATUS_PARTIAL) class Meta: db_table = 'deposit' def __str__(self): return str({ 'id': self.id, 'reception_date': self.reception_date, 'collection': self.collection.name, 'external_id': self.external_id, 'client': self.client.username, 'status': self.status }) class DepositRequestType(models.Model): """Deposit request type made by clients (either archive or metadata) """ id = models.BigAutoField(primary_key=True) name = models.TextField() class Meta: db_table = 'deposit_request_type' def __str__(self): return str({'id': self.id, 'name': self.name}) def client_directory_path(instance, filename): """Callable to upload archive in MEDIA_ROOT/user_/ Args: instance (DepositRequest): DepositRequest concerned by the upload filename (str): Filename of the uploaded file Returns: A path to be prefixed by the MEDIA_ROOT to access physically to the file uploaded. """ return 'client_{0}/{1}'.format(instance.deposit.client.id, filename) class DepositRequest(models.Model): """Deposit request associated to one deposit. """ id = models.BigAutoField(primary_key=True) # Deposit concerned by the request deposit = models.ForeignKey(Deposit, models.DO_NOTHING) date = models.DateTimeField(auto_now_add=True) # Deposit request information on the data to inject # this can be null when type is 'archive' metadata = JSONField(null=True) # this can be null when type is 'metadata' archive = models.FileField(null=True, upload_to=client_directory_path) type = models.ForeignKey( 'DepositRequestType', models.DO_NOTHING) class Meta: db_table = 'deposit_request' def __str__(self): meta = None if self.metadata: from json import dumps meta = dumps(self.metadata) archive_name = None if self.archive: archive_name = self.archive.name return str({ 'id': self.id, 'deposit': self.deposit, 'metadata': meta, 'archive': archive_name }) class DepositCollection(models.Model): id = models.BigAutoField(primary_key=True) # Human readable name for the collection type e.g HAL, arXiv, etc... name = models.TextField() class Meta: db_table = 'deposit_collection' def __str__(self): return str({'id': self.id, 'name': self.name}) diff --git a/swh/deposit/tests/api/test_deposit_read_metadata.py b/swh/deposit/tests/api/test_deposit_read_metadata.py index c1b79a2e..1223ae69 100644 --- a/swh/deposit/tests/api/test_deposit_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_read_metadata.py @@ -1,93 +1,109 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.core.urlresolvers import reverse from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Deposit access to read metadata information on deposit. """ @istest def access_to_an_existing_deposit_returns_metadata(self): deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response._headers['content-type'][1], 'application/json') data = json.loads(response.content.decode('utf-8')) expected_meta = { 'origin': { - 'url': 'some-external-id', - 'type': 'hal' + 'url': 'https://hal.archives-ouvertes.fr/some-external-id', + 'type': 'deposit' + }, + 'origin_metadata': { + 'metadata': {}, + 'provider': { + 'provider_name': '', + 'provider_type': 'deposit_client', + 'provider_url': 'https://hal.archives-ouvertes.fr/', + 'metadata': {} + }, + 'tool': { + 'tool_name': 'swh-deposit', + 'tool_version': '0.0.1', + 'tool_configuration': { + 'sword_version': '2' + } + } }, 'revision': { 'synthetic': True, 'committer_date': None, 'message': ': Deposit %s in collection hal' % deposit_id, 'author': { 'fullname': '', 'email': '', 'name': '' }, 'committer': { 'fullname': '', 'email': '', 'name': '' }, 'date': None, 'metadata': {}, 'type': 'tar' }, 'occurrence': { 'branch': 'master' } } self.assertEquals(data, expected_meta) @istest def access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) @istest def access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8'),) diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index 9b3bd8fe..50999a3f 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,310 +1,312 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os import shutil import tempfile from django.core.urlresolvers import reverse from django.test import TestCase from io import BytesIO from nose.plugins.attrib import attr from rest_framework import status from swh.deposit.config import COL_IRI, EM_IRI, EDIT_SE_IRI from swh.deposit.models import DepositClient, DepositCollection from swh.deposit.models import DepositRequestType from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.loader.tar import tarball def create_arborescence_zip(root_path, archive_name, filename, content, up_to_size=None): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) l = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += l while count < up_to_size: f.write(b'0'*batch_size) count += batch_size zip_path = dir_path + '.zip' zip_path = tarball.compress(zip_path, 'zip', dir_path) with open(zip_path, 'rb') as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b'' for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { 'dir': archive_path_dir, 'name': archive_name, 'data': data, 'path': zip_path, 'sha1sum': sha1sum.hexdigest(), 'md5sum': md5sum.hexdigest(), 'length': length, } @attr('fs') class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) self.archive = create_arborescence_zip( self.root_path, 'archive1', 'file1', b'some content in file') def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=False): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_READY response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id @attr('fs') class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data deposit_request_types = {} # Add deposit request types for deposit_request_type in ['archive', 'metadata']: drt = DepositRequestType(name=deposit_request_type) drt.save() deposit_request_types[deposit_request_type] = drt _name = 'hal' + _url = 'https://hal.archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, - password=_name) + password=_name, + url=_url) _client.collections = [_collection.id] _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name self.deposit_request_types = deposit_request_types def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" some-external-id """ self.atom_entry_data1 = b""" anotherthing """ def create_deposit_with_status_rejected(self): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def create_simple_deposit_partial(self): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content[ '{http://www.w3.org/2005/Atom}deposit_id'] return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `ready` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self): """Create a complex deposit (2 requests) in status `ready`. """ deposit_id = self.create_simple_deposit_partial() deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial() deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id