diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py
index 02fd80e0..6eb81822 100644
--- a/swh/deposit/api/private/deposit_read.py
+++ b/swh/deposit/api/private/deposit_read.py
@@ -1,239 +1,239 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import os
import shutil
import tempfile
from contextlib import contextmanager
from django.http import FileResponse
from rest_framework import status
from swh.core import tarball
from swh.model import identifiers
from ...config import SWH_PERSON
from ..common import SWHGetDepositAPI, SWHPrivateAPIView
from ...models import Deposit, DepositRequest
@contextmanager
def aggregate_tarballs(extraction_dir, archive_paths):
"""Aggregate multiple tarballs into one and returns this new archive's
path.
Args:
extraction_dir (path): Path to use for the tarballs computation
archive_paths ([str]): Deposit's archive paths
Returns:
Tuple (directory to clean up, archive path (aggregated or not))
"""
if len(archive_paths) > 1: # need to rebuild one archive
# from multiple ones
os.makedirs(extraction_dir, 0o755, exist_ok=True)
dir_path = tempfile.mkdtemp(prefix='swh.deposit-',
dir=extraction_dir)
# root folder to build an aggregated tarball
aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate')
os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True)
# uncompress in a temporary location all archives
for archive_path in archive_paths:
tarball.uncompress(archive_path, aggregated_tarball_rootdir)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath = tarball.compress(
aggregated_tarball_rootdir + '.zip',
nature='zip',
dirpath_or_files=aggregated_tarball_rootdir)
# can already clean up temporary directory
shutil.rmtree(aggregated_tarball_rootdir)
try:
yield temp_tarpath
finally:
shutil.rmtree(dir_path)
else: # only 1 archive, no need to do fancy actions (and no cleanup step)
yield archive_paths[0]
class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView):
"""Dedicated class to read a deposit's raw archives content.
Only GET is supported.
"""
ADDITIONAL_CONFIG = {
'extraction_dir': ('str', '/tmp/swh-deposit/archive/'),
}
def __init__(self):
super().__init__()
self.extraction_dir = self.config['extraction_dir']
if not os.path.exists(self.extraction_dir):
os.makedirs(self.extraction_dir)
def retrieve_archives(self, deposit_id):
"""Given a deposit identifier, returns its associated archives' path.
Yields:
path to deposited archives
"""
deposit = Deposit.objects.get(pk=deposit_id)
deposit_requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive']).order_by('id')
for deposit_request in deposit_requests:
yield deposit_request.archive.path
def process_get(self, req, collection_name, deposit_id):
"""Build a unique tarball from the multiple received and stream that
content to the client.
Args:
req (Request):
collection_name (str): Collection owning the deposit
deposit_id (id): Deposit concerned by the reading
Returns:
Tuple status, stream of content, content-type
"""
archive_paths = list(self.retrieve_archives(deposit_id))
with aggregate_tarballs(self.extraction_dir,
archive_paths) as path:
return FileResponse(open(path, 'rb'),
status=status.HTTP_200_OK,
content_type='application/octet-stream')
class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView):
"""Class in charge of aggregating metadata on a deposit.
"""
ADDITIONAL_CONFIG = {
'provider': ('dict', {
# 'provider_name': '', # those are not set since read from the
# 'provider_url': '', # deposit's client
'provider_type': 'deposit_client',
'metadata': {}
}),
'tool': ('dict', {
'name': 'swh-deposit',
'version': '0.0.1',
'configuration': {
'sword_version': '2'
}
})
}
def __init__(self):
super().__init__()
self.provider = self.config['provider']
self.tool = self.config['tool']
def _aggregate_metadata(self, deposit, metadata_requests):
"""Retrieve and aggregates metadata information.
"""
metadata = {}
for req in metadata_requests:
metadata.update(req.metadata)
return metadata
def _retrieve_url(self, deposit, metadata):
client_domain = deposit.client.domain
for field in metadata:
if 'url' in field:
if client_domain in metadata[field]:
return metadata[field]
def aggregate(self, deposit, requests):
"""Aggregate multiple data on deposit into one unified data dictionary.
Args:
deposit (Deposit): Deposit concerned by the data aggregation.
requests ([DepositRequest]): List of associated requests which
need aggregation.
Returns:
Dictionary of data representing the deposit to inject in swh.
"""
data = {}
# Retrieve tarballs/metadata information
metadata = self._aggregate_metadata(deposit, requests)
# create origin_url from metadata only after deposit_check validates it
origin_url = self._retrieve_url(deposit, metadata)
# Read information metadata
data['origin'] = {
'type': 'deposit',
'url': origin_url
}
# revision
- fullname = deposit.client.get_full_name()
+ fullname = deposit.client.username
author_committer = SWH_PERSON
# metadata provider
self.provider['provider_name'] = deposit.client.last_name
self.provider['provider_url'] = deposit.client.provider_url
revision_type = 'tar'
revision_msg = '%s: Deposit %s in collection %s' % (
fullname, deposit.id, deposit.collection.name)
complete_date = identifiers.normalize_timestamp(deposit.complete_date)
data['revision'] = {
'synthetic': True,
'date': complete_date,
'committer_date': complete_date,
'author': author_committer,
'committer': author_committer,
'type': revision_type,
'message': revision_msg,
'metadata': metadata,
}
if deposit.parent:
swh_persistent_id = deposit.parent.swh_id
persistent_identifier = identifiers.parse_persistent_identifier(
swh_persistent_id)
parent_revision = persistent_identifier['object_id']
data['revision']['parents'] = [parent_revision]
data['occurrence'] = {
'branch': 'master'
}
data['origin_metadata'] = {
'provider': self.provider,
'tool': self.tool,
'metadata': metadata
}
return data
def process_get(self, req, collection_name, deposit_id):
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata'])
data = self.aggregate(deposit, requests)
d = {}
if data:
d = json.dumps(data)
return status.HTTP_200_OK, d, 'application/json'
diff --git a/swh/deposit/tests/api/test_deposit_read_metadata.py b/swh/deposit/tests/api/test_deposit_read_metadata.py
index e46682c4..068f34b8 100644
--- a/swh/deposit/tests/api/test_deposit_read_metadata.py
+++ b/swh/deposit/tests/api/test_deposit_read_metadata.py
@@ -1,215 +1,215 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from django.core.urlresolvers import reverse
from nose.tools import istest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit
from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_PARTIAL
from ...config import SWH_PERSON
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Deposit access to read metadata information on deposit.
"""
@istest
def read_metadata(self):
"""Private metadata read api to existing deposit should return metadata
"""
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_200_OK)
self.assertEquals(response._headers['content-type'][1],
'application/json')
data = json.loads(response.content.decode('utf-8'))
expected_meta = {
'origin': {
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id',
'type': 'deposit'
},
'origin_metadata': {
'metadata': {
'{http://www.w3.org/2005/Atom}external_identifier':
'some-external-id',
'{http://www.w3.org/2005/Atom}url':
'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'provider': {
- 'provider_name': '',
+ 'provider_name': 'hal',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/',
'metadata': {}
},
'tool': {
'tool_name': 'swh-deposit',
'tool_version': '0.0.1',
'tool_configuration': {
'sword_version': '2'
}
}
},
'revision': {
'synthetic': True,
'committer_date': None,
- 'message': ': Deposit %s in collection hal' % deposit_id,
+ 'message': 'hal: Deposit %s in collection hal' % deposit_id,
'author': SWH_PERSON,
'committer': SWH_PERSON,
'date': None,
'metadata': {
'{http://www.w3.org/2005/Atom}external_identifier':
'some-external-id',
'{http://www.w3.org/2005/Atom}url':
'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'type': 'tar'
},
'occurrence': {
'branch': 'master'
}
}
self.assertEquals(data, expected_meta)
@istest
def read_metadata_revision_with_parent(self):
"""Private read metadata to a deposit (with parent) returns metadata
"""
swh_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa'
swh_persistent_id = 'swh:1:rev:%s' % swh_id
deposit_id1 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id',
swh_id=swh_persistent_id)
deposit_parent = Deposit.objects.get(pk=deposit_id1)
self.assertEquals(deposit_parent.swh_id, swh_persistent_id)
self.assertEquals(deposit_parent.external_id, 'some-external-id')
self.assertEquals(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id = self.create_deposit_partial(
external_id='some-external-id')
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(deposit.external_id, 'some-external-id')
self.assertEquals(deposit.swh_id, None)
self.assertEquals(deposit.parent, deposit_parent)
self.assertEquals(deposit.status, DEPOSIT_STATUS_PARTIAL)
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_200_OK)
self.assertEquals(response._headers['content-type'][1],
'application/json')
data = json.loads(response.content.decode('utf-8'))
expected_meta = {
'origin': {
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id',
'type': 'deposit'
},
'origin_metadata': {
'metadata': {
'{http://www.w3.org/2005/Atom}external_identifier':
'some-external-id',
'{http://www.w3.org/2005/Atom}url':
'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'provider': {
- 'provider_name': '',
+ 'provider_name': 'hal',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/',
'metadata': {}
},
'tool': {
'tool_name': 'swh-deposit',
'tool_version': '0.0.1',
'tool_configuration': {
'sword_version': '2'
}
}
},
'revision': {
'synthetic': True,
'date': None,
'committer_date': None,
'author': SWH_PERSON,
'committer': SWH_PERSON,
'type': 'tar',
- 'message': ': Deposit %s in collection hal' % deposit_id,
+ 'message': 'hal: Deposit %s in collection hal' % deposit_id,
'metadata': {
'{http://www.w3.org/2005/Atom}external_identifier':
'some-external-id',
'{http://www.w3.org/2005/Atom}url':
'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'parents': [swh_id]
},
'occurrence': {
'branch': 'master'
}
}
self.assertEquals(data, expected_meta)
@istest
def access_to_nonexisting_deposit_returns_404_response(self):
"""Read unknown collection should return a 404 response
"""
unknown_id = '999'
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, unknown_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Deposit with id %s does not exist' % unknown_id,
response.content.decode('utf-8'))
@istest
def access_to_nonexisting_collection_returns_404_response(self):
"""Read unknown deposit should return a 404 response
"""
collection_name = 'non-existing'
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[collection_name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Unknown collection name %s' % collection_name,
response.content.decode('utf-8'),)
diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py
index fae9364d..d17c836d 100644
--- a/swh/deposit/tests/common.py
+++ b/swh/deposit/tests/common.py
@@ -1,466 +1,467 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import hashlib
import os
import shutil
import tempfile
from django.core.urlresolvers import reverse
from django.test import TestCase
from io import BytesIO
from nose.plugins.attrib import attr
from rest_framework import status
from swh.deposit.config import COL_IRI, EM_IRI, EDIT_SE_IRI
from swh.deposit.models import DepositClient, DepositCollection, Deposit
from swh.deposit.models import DepositRequest
from swh.deposit.models import DepositRequestType
from swh.deposit.parsers import parse_xml
from swh.deposit.settings.testing import MEDIA_ROOT
from swh.core import tarball
def create_arborescence_zip(root_path, archive_name, filename, content,
up_to_size=None):
"""Build an archive named archive_name in the root_path.
This archive contains one file named filename with the content content.
Returns:
dict with the keys:
- dir: the directory of that archive
- path: full path to the archive
- sha1sum: archive's sha1sum
- length: archive's length
"""
os.makedirs(root_path, exist_ok=True)
archive_path_dir = tempfile.mkdtemp(dir=root_path)
dir_path = os.path.join(archive_path_dir, archive_name)
os.mkdir(dir_path)
filepath = os.path.join(dir_path, filename)
_length = len(content)
count = 0
batch_size = 128
with open(filepath, 'wb') as f:
f.write(content)
if up_to_size: # fill with blank content up to a given size
count += _length
while count < up_to_size:
f.write(b'0'*batch_size)
count += batch_size
zip_path = dir_path + '.zip'
zip_path = tarball.compress(zip_path, 'zip', dir_path)
with open(zip_path, 'rb') as f:
length = 0
sha1sum = hashlib.sha1()
md5sum = hashlib.md5()
data = b''
for chunk in f:
sha1sum.update(chunk)
md5sum.update(chunk)
length += len(chunk)
data += chunk
return {
'dir': archive_path_dir,
'name': archive_name,
'data': data,
'path': zip_path,
'sha1sum': sha1sum.hexdigest(),
'md5sum': md5sum.hexdigest(),
'length': length,
}
@attr('fs')
class FileSystemCreationRoutine(TestCase):
"""Mixin intended for tests needed to tamper with archives.
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
self.root_path = '/tmp/swh-deposit/test/build-zip/'
os.makedirs(self.root_path, exist_ok=True)
self.archive = create_arborescence_zip(
self.root_path, 'archive1', 'file1', b'some content in file')
self.atom_entry = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr
"""
def tearDown(self):
super().tearDown()
shutil.rmtree(self.root_path)
def create_simple_binary_deposit(self, status_partial=True):
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/zip',
data=self.archive['data'],
CONTENT_LENGTH=self.archive['length'],
HTTP_MD5SUM=self.archive['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def create_complex_binary_deposit(self, status_partial=False):
deposit_id = self.create_simple_binary_deposit(
status_partial=True)
# Add a second archive to the deposit
# update its status to DEPOSIT_STATUS_VERIFIED
response = self.client.post(
reverse(EM_IRI, args=[self.collection.name, deposit_id]),
content_type='application/zip',
data=self.archive2['data'],
CONTENT_LENGTH=self.archive2['length'],
HTTP_MD5SUM=self.archive2['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def update_binary_deposit(self, deposit_id, status_partial=False):
# update existing deposit with atom entry metadata
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
# assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
@attr('fs')
class BasicTestCase(TestCase):
"""Mixin intended for data setup purposes (user, collection, etc...)
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
# expanding diffs in tests
self.maxDiff = None
# basic minimum test data
deposit_request_types = {}
# Add deposit request types
for deposit_request_type in ['archive', 'metadata']:
drt = DepositRequestType(name=deposit_request_type)
drt.save()
deposit_request_types[deposit_request_type] = drt
_name = 'hal'
_provider_url = 'https://hal-test.archives-ouvertes.fr/'
_domain = 'archives-ouvertes.fr/'
# set collection up
_collection = DepositCollection(name=_name)
_collection.save()
# set user/client up
_client = DepositClient.objects.create_user(username=_name,
password=_name,
provider_url=_provider_url,
domain=_domain)
_client.collections = [_collection.id]
+ _client.last_name = _name
_client.save()
self.collection = _collection
self.user = _client
self.username = _name
self.userpass = _name
self.deposit_request_types = deposit_request_types
def tearDown(self):
super().tearDown()
# Clean up uploaded files in temporary directory (tests have
# their own media root folder)
if os.path.exists(MEDIA_ROOT):
for d in os.listdir(MEDIA_ROOT):
shutil.rmtree(os.path.join(MEDIA_ROOT, d))
class WithAuthTestCase(TestCase):
"""Mixin intended for testing the api with basic authentication.
"""
def setUp(self):
super().setUp()
_token = '%s:%s' % (self.username, self.userpass)
token = base64.b64encode(_token.encode('utf-8'))
authorization = 'Basic %s' % token.decode('utf-8')
self.client.credentials(HTTP_AUTHORIZATION=authorization)
def tearDown(self):
super().tearDown()
self.client.credentials()
class CommonCreationRoutine(TestCase):
"""Mixin class to share initialization routine.
cf:
`class`:test_deposit_update.DepositReplaceExistingDataTest
`class`:test_deposit_update.DepositUpdateDepositWithNewDataTest
`class`:test_deposit_update.DepositUpdateFailuresTest
`class`:test_deposit_delete.DepositDeleteTest
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
some-external-id
https://hal-test.archives-ouvertes.fr/some-external-id
"""
self.atom_entry_data1 = b"""
anotherthing
https://hal-test.archives-ouvertes.fr/anotherthing
"""
self.atom_entry_data2 = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr/id
"""
self.codemeta_entry_data0 = b"""
Awesome Compiler
https://hal-test.archives-ouvertes.fr/1785io25c695
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
description
key-word 1
"""
self.codemeta_entry_data1 = b"""
Composing a Web of Audio Applications
hal
hal-01243065
hal-01243065
https://hal-test.archives-ouvertes.fr/hal-01243065
test
DSP programming,Web
2017-05-03T16:08:47+02:00
this is the description
1
phpstorm
stable
php
python
C
GNU General Public License v3.0 only
CeCILL Free Software License Agreement v1.1
HAL
hal@ccsd.cnrs.fr
Morane Gruenpeter
"""
def create_invalid_deposit(self, external_id='some-external-id-1'):
url = reverse(COL_IRI, args=[self.collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def create_deposit_with_status(
self, status, external_id='some-external-id-1', swh_id=None):
deposit_id = self.create_invalid_deposit(external_id)
# We cannot create some form of deposit with a given status in
# test context ('rejected' for example). As flipped off the
# checks in the configuration so all deposits have the status
# deposited). Update in place the deposit with such
# status
deposit = Deposit.objects.get(pk=deposit_id)
deposit.status = status
if swh_id:
deposit.swh_id = swh_id
deposit.save()
return deposit_id
def create_simple_deposit_partial(self, external_id='some-external-id'):
"""Create a simple deposit (1 request) in `partial` state and returns
its new identifier.
Returns:
deposit id
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def create_deposit_partial_with_data_in_args(self, data):
"""Create a simple deposit (1 request) in `partial` state with the data
or metadata as an argument and returns its new identifier.
Args:
data: atom entry
Returns:
deposit id
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=data,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content[
'{http://www.w3.org/2005/Atom}deposit_id']
return deposit_id
def _update_deposit_with_status(self, deposit_id, status_partial=False):
"""Add to a given deposit another archive and update its current
status to `deposited` (by default).
Returns:
deposit id
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
assert response.status_code == status.HTTP_201_CREATED
return deposit_id
def create_deposit_ready(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `deposited`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(deposit_id)
return deposit_id
def create_deposit_partial(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `partial`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(
deposit_id, status_partial=True)
return deposit_id
def add_metadata_to_deposit(self, deposit_id, status_partial=False):
"""Add metadata to deposit.
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
assert response.status_code == status.HTTP_201_CREATED
# then
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit is not None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert deposit_requests is not []
for dr in deposit_requests:
if dr.type.name == 'metadata':
assert deposit_requests[0].metadata is not {}
return deposit_id