diff --git a/requirements-server.txt b/requirements-server.txt
index 0fd17f45..d2631e2c 100644
--- a/requirements-server.txt
+++ b/requirements-server.txt
@@ -1,2 +1,2 @@
-Django < 2.0
+Django < 3
djangorestframework
diff --git a/swh/deposit/api/common.py b/swh/deposit/api/common.py
index dbff46e0..79560007 100644
--- a/swh/deposit/api/common.py
+++ b/swh/deposit/api/common.py
@@ -1,901 +1,901 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
from typing import Any, Tuple
from abc import ABCMeta, abstractmethod
from django.urls import reverse
from django.http import HttpResponse
from django.shortcuts import render
from django.utils import timezone
from rest_framework import status
from rest_framework.authentication import BasicAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from swh.model import hashutil
from swh.scheduler.utils import create_oneshot_task_dict
from ..config import (
SWHDefaultConfig, EDIT_SE_IRI, EM_IRI, CONT_FILE_IRI,
ARCHIVE_KEY, METADATA_KEY, RAW_METADATA_KEY, STATE_IRI,
DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL,
PRIVATE_CHECK_DEPOSIT,
DEPOSIT_STATUS_LOAD_SUCCESS, ARCHIVE_TYPE, METADATA_TYPE
)
from ..errors import (
MAX_UPLOAD_SIZE_EXCEEDED, BAD_REQUEST, ERROR_CONTENT,
CHECKSUM_MISMATCH, make_error_dict, MEDIATION_NOT_ALLOWED,
make_error_response_from_dict, FORBIDDEN,
NOT_FOUND, make_error_response, METHOD_NOT_ALLOWED,
ParserError, PARSING_ERROR
)
from ..models import (
Deposit, DepositRequest, DepositCollection,
DepositClient
)
from ..parsers import parse_xml
ACCEPT_PACKAGINGS = ['http://purl.org/net/sword/package/SimpleZip']
ACCEPT_ARCHIVE_CONTENT_TYPES = ['application/zip', 'application/x-tar']
class SWHAPIView(APIView):
"""Mixin intended as a based API view to enforce the basic
authentication check
"""
authentication_classes = (BasicAuthentication, ) # type: Tuple[Any, ...]
permission_classes = (IsAuthenticated, )
class SWHBaseDeposit(SWHDefaultConfig, SWHAPIView, metaclass=ABCMeta):
"""Base deposit request class sharing multiple common behaviors.
"""
def _read_headers(self, req):
"""Read and unify the necessary headers from the request (those are
not stored in the same location or not properly formatted).
Args:
req (Request): Input request
Returns:
Dictionary with the following keys (some associated values may be
None):
- content-type
- content-length
- in-progress
- content-disposition
- packaging
- slug
- on-behalf-of
"""
meta = req._request.META
content_type = req.content_type
content_length = meta.get('CONTENT_LENGTH')
if content_length and isinstance(content_length, str):
content_length = int(content_length)
# final deposit if not provided
in_progress = meta.get('HTTP_IN_PROGRESS', False)
content_disposition = meta.get('HTTP_CONTENT_DISPOSITION')
if isinstance(in_progress, str):
in_progress = in_progress.lower() == 'true'
content_md5sum = meta.get('HTTP_CONTENT_MD5')
if content_md5sum:
content_md5sum = bytes.fromhex(content_md5sum)
packaging = meta.get('HTTP_PACKAGING')
slug = meta.get('HTTP_SLUG')
on_behalf_of = meta.get('HTTP_ON_BEHALF_OF')
metadata_relevant = meta.get('HTTP_METADATA_RELEVANT')
return {
'content-type': content_type,
'content-length': content_length,
'in-progress': in_progress,
'content-disposition': content_disposition,
'content-md5sum': content_md5sum,
'packaging': packaging,
'slug': slug,
'on-behalf-of': on_behalf_of,
'metadata-relevant': metadata_relevant,
}
def _compute_md5(self, filehandler):
"""Compute uploaded file's md5 sum.
Args:
filehandler (InMemoryUploadedFile): the file to compute the md5
hash
Returns:
the md5 checksum (str)
"""
h = hashlib.md5()
for chunk in filehandler:
h.update(chunk)
return h.digest()
def _deposit_put(self, req, deposit_id=None, in_progress=False,
external_id=None):
"""Save/Update a deposit in db.
Args:
deposit_id (int): deposit identifier
in_progress (dict): The deposit's status
external_id (str): The external identifier to associate to
the deposit
Returns:
The Deposit instance saved or updated.
"""
if in_progress is False:
complete_date = timezone.now()
status_type = DEPOSIT_STATUS_DEPOSITED
else:
complete_date = None
status_type = DEPOSIT_STATUS_PARTIAL
if not deposit_id:
try:
# find a deposit parent (same external id, status load
# to success)
deposit_parent = Deposit.objects.filter(
external_id=external_id,
status=DEPOSIT_STATUS_LOAD_SUCCESS).order_by('-id')[0:1].get() # noqa
except Deposit.DoesNotExist:
deposit_parent = None
deposit = Deposit(collection=self._collection,
external_id=external_id,
complete_date=complete_date,
status=status_type,
client=self._client,
parent=deposit_parent)
else:
deposit = Deposit.objects.get(pk=deposit_id)
# update metadata
deposit.complete_date = complete_date
deposit.status = status_type
if self.config['checks']:
deposit.save() # needed to have a deposit id
args = [deposit.collection.name, deposit.id]
scheduler = self.scheduler
if (deposit.status == DEPOSIT_STATUS_DEPOSITED and
not deposit.check_task_id):
check_url = req.build_absolute_uri(
reverse(PRIVATE_CHECK_DEPOSIT, args=args))
task = create_oneshot_task_dict(
'check-deposit', deposit_check_url=check_url)
check_task_id = scheduler.create_tasks([task])[0]['id']
deposit.check_task_id = check_task_id
deposit.save()
return deposit
def _deposit_request_put(self, deposit, deposit_request_data,
replace_metadata=False, replace_archives=False):
"""Save a deposit request with metadata attached to a deposit.
Args:
deposit (Deposit): The deposit concerned by the request
deposit_request_data (dict): The dictionary with at most 2 deposit
request types (archive, metadata) to associate to the deposit
replace_metadata (bool): Flag defining if we add or update
existing metadata to the deposit
replace_archives (bool): Flag defining if we add or update
archives to existing deposit
Returns:
None
"""
if replace_metadata:
DepositRequest.objects.filter(
deposit=deposit,
type=METADATA_TYPE).delete()
if replace_archives:
DepositRequest.objects.filter(
deposit=deposit,
type=ARCHIVE_TYPE).delete()
deposit_request = None
archive_file = deposit_request_data.get(ARCHIVE_KEY)
if archive_file:
deposit_request = DepositRequest(
type=ARCHIVE_TYPE,
deposit=deposit,
archive=archive_file)
deposit_request.save()
metadata = deposit_request_data.get(METADATA_KEY)
if metadata:
raw_metadata = deposit_request_data.get(RAW_METADATA_KEY)
deposit_request = DepositRequest(
type=METADATA_TYPE,
deposit=deposit,
metadata=metadata,
- raw_metadata=raw_metadata)
+ raw_metadata=raw_metadata.decode('utf-8'))
deposit_request.save()
assert deposit_request is not None
def _delete_archives(self, collection_name, deposit_id):
"""Delete archives reference from the deposit id.
"""
try:
deposit = Deposit.objects.get(pk=deposit_id)
except Deposit.DoesNotExist:
return make_error_dict(
NOT_FOUND,
'The deposit %s does not exist' % deposit_id)
DepositRequest.objects.filter(
deposit=deposit,
type=ARCHIVE_TYPE).delete()
return {}
def _delete_deposit(self, collection_name, deposit_id):
"""Delete deposit reference.
Args:
collection_name (str): Client's name
deposit_id (id): The deposit to delete
Returns
Empty dict when ok.
Dict with error key to describe the failure.
"""
try:
deposit = Deposit.objects.get(pk=deposit_id)
except Deposit.DoesNotExist:
return make_error_dict(
NOT_FOUND,
'The deposit %s does not exist' % deposit_id)
if deposit.collection.name != collection_name:
summary = 'Cannot delete a deposit from another collection'
description = "Deposit %s does not belong to the collection %s" % (
deposit_id, collection_name)
return make_error_dict(
BAD_REQUEST,
summary=summary,
verbose_description=description)
DepositRequest.objects.filter(deposit=deposit).delete()
deposit.delete()
return {}
def _check_preconditions_on(self, filehandler, md5sum,
content_length=None):
"""Check preconditions on provided file are respected. That is the
length and/or the md5sum hash match the file's content.
Args:
filehandler (InMemoryUploadedFile): The file to check
md5sum (hex str): md5 hash expected from the file's content
content_length (int): the expected length if provided.
Returns:
Either none if no error or a dictionary with a key error
detailing the problem.
"""
if content_length:
if content_length > self.config['max_upload_size']:
return make_error_dict(
MAX_UPLOAD_SIZE_EXCEEDED,
'Upload size limit exceeded (max %s bytes).' %
self.config['max_upload_size'],
'Please consider sending the archive in '
'multiple steps.')
length = filehandler.size
if length != content_length:
return make_error_dict(status.HTTP_412_PRECONDITION_FAILED,
'Wrong length')
if md5sum:
_md5sum = self._compute_md5(filehandler)
if _md5sum != md5sum:
return make_error_dict(
CHECKSUM_MISMATCH,
'Wrong md5 hash',
'The checksum sent %s and the actual checksum '
'%s does not match.' % (hashutil.hash_to_hex(md5sum),
hashutil.hash_to_hex(_md5sum)))
return None
def _binary_upload(self, req, headers, collection_name, deposit_id=None,
replace_metadata=False, replace_archives=False):
"""Binary upload routine.
Other than such a request, a 415 response is returned.
Args:
req (Request): the request holding information to parse
and inject in db
headers (dict): request headers formatted
collection_name (str): the associated client
deposit_id (id): deposit identifier if provided
replace_metadata (bool): 'Update or add' request to existing
deposit. If False (default), this adds new metadata request to
existing ones. Otherwise, this will replace existing metadata.
replace_archives (bool): 'Update or add' request to existing
deposit. If False (default), this adds new archive request to
existing ones. Otherwise, this will replace existing archives.
ones.
Returns:
In the optimal case a dict with the following keys:
- deposit_id (int): Deposit identifier
- deposit_date (date): Deposit date
- archive: None (no archive is provided here)
Otherwise, a dictionary with the key error and the
associated failures, either:
- 400 (bad request) if the request is not providing an external
identifier
- 413 (request entity too large) if the length of the
archive exceeds the max size configured
- 412 (precondition failed) if the length or md5 hash provided
mismatch the reality of the archive
- 415 (unsupported media type) if a wrong media type is provided
"""
content_length = headers['content-length']
if not content_length:
return make_error_dict(
BAD_REQUEST,
'CONTENT_LENGTH header is mandatory',
'For archive deposit, the '
'CONTENT_LENGTH header must be sent.')
content_disposition = headers['content-disposition']
if not content_disposition:
return make_error_dict(
BAD_REQUEST,
'CONTENT_DISPOSITION header is mandatory',
'For archive deposit, the '
'CONTENT_DISPOSITION header must be sent.')
packaging = headers['packaging']
if packaging and packaging not in ACCEPT_PACKAGINGS:
return make_error_dict(
BAD_REQUEST,
'Only packaging %s is supported' %
ACCEPT_PACKAGINGS,
'The packaging provided %s is not supported' % packaging)
filehandler = req.FILES['file']
precondition_status_response = self._check_preconditions_on(
filehandler, headers['content-md5sum'], content_length)
if precondition_status_response:
return precondition_status_response
external_id = headers['slug']
# actual storage of data
archive_metadata = filehandler
deposit = self._deposit_put(req, deposit_id=deposit_id,
in_progress=headers['in-progress'],
external_id=external_id)
self._deposit_request_put(
deposit, {ARCHIVE_KEY: archive_metadata},
replace_metadata=replace_metadata,
replace_archives=replace_archives)
return {
'deposit_id': deposit.id,
'deposit_date': deposit.reception_date,
'status': deposit.status,
'archive': filehandler.name,
}
def _read_metadata(self, metadata_stream):
"""Given a metadata stream, reads the metadata and returns both the
parsed and the raw metadata.
"""
raw_metadata = metadata_stream.read()
metadata = parse_xml(raw_metadata)
return raw_metadata, metadata
def _multipart_upload(self, req, headers, collection_name,
deposit_id=None, replace_metadata=False,
replace_archives=False):
"""Multipart upload supported with exactly:
- 1 archive (zip)
- 1 atom entry
Other than such a request, a 415 response is returned.
Args:
req (Request): the request holding information to parse
and inject in db
headers (dict): request headers formatted
collection_name (str): the associated client
deposit_id (id): deposit identifier if provided
replace_metadata (bool): 'Update or add' request to existing
deposit. If False (default), this adds new metadata request to
existing ones. Otherwise, this will replace existing metadata.
replace_archives (bool): 'Update or add' request to existing
deposit. If False (default), this adds new archive request to
existing ones. Otherwise, this will replace existing archives.
ones.
Returns:
In the optimal case a dict with the following keys:
- deposit_id (int): Deposit identifier
- deposit_date (date): Deposit date
- archive: None (no archive is provided here)
Otherwise, a dictionary with the key error and the
associated failures, either:
- 400 (bad request) if the request is not providing an external
identifier
- 412 (precondition failed) if the potentially md5 hash provided
mismatch the reality of the archive
- 413 (request entity too large) if the length of the
archive exceeds the max size configured
- 415 (unsupported media type) if a wrong media type is provided
"""
external_id = headers['slug']
content_types_present = set()
data = {
'application/zip': None, # expected either zip
'application/x-tar': None, # or x-tar
'application/atom+xml': None,
}
for key, value in req.FILES.items():
fh = value
if fh.content_type in content_types_present:
return make_error_dict(
ERROR_CONTENT,
'Only 1 application/zip (or application/x-tar) archive '
'and 1 atom+xml entry is supported (as per sword2.0 '
'specification)',
'You provided more than 1 application/(zip|x-tar) '
'or more than 1 application/atom+xml content-disposition '
'header in the multipart deposit')
content_types_present.add(fh.content_type)
data[fh.content_type] = fh
if len(content_types_present) != 2:
return make_error_dict(
ERROR_CONTENT,
'You must provide both 1 application/zip (or '
'application/x-tar) and 1 atom+xml entry for multipart '
'deposit',
'You need to provide only 1 application/(zip|x-tar) '
'and 1 application/atom+xml content-disposition header '
'in the multipart deposit')
filehandler = data['application/zip']
if not filehandler:
filehandler = data['application/x-tar']
precondition_status_response = self._check_preconditions_on(
filehandler,
headers['content-md5sum'])
if precondition_status_response:
return precondition_status_response
try:
raw_metadata, metadata = self._read_metadata(
data['application/atom+xml'])
except ParserError:
return make_error_dict(
PARSING_ERROR,
'Malformed xml metadata',
"The xml received is malformed. "
"Please ensure your metadata file is correctly formatted.")
# actual storage of data
deposit = self._deposit_put(req, deposit_id=deposit_id,
in_progress=headers['in-progress'],
external_id=external_id)
deposit_request_data = {
ARCHIVE_KEY: filehandler,
METADATA_KEY: metadata,
RAW_METADATA_KEY: raw_metadata,
}
self._deposit_request_put(
deposit, deposit_request_data, replace_metadata, replace_archives)
return {
'deposit_id': deposit.id,
'deposit_date': deposit.reception_date,
'archive': filehandler.name,
'status': deposit.status,
}
def _atom_entry(self, req, headers, collection_name,
deposit_id=None,
replace_metadata=False,
replace_archives=False):
"""Atom entry deposit.
Args:
req (Request): the request holding information to parse
and inject in db
headers (dict): request headers formatted
collection_name (str): the associated client
deposit_id (id): deposit identifier if provided
replace_metadata (bool): 'Update or add' request to existing
deposit. If False (default), this adds new metadata request to
existing ones. Otherwise, this will replace existing metadata.
replace_archives (bool): 'Update or add' request to existing
deposit. If False (default), this adds new archive request to
existing ones. Otherwise, this will replace existing archives.
ones.
Returns:
In the optimal case a dict with the following keys:
- deposit_id: deposit id associated to the deposit
- deposit_date: date of the deposit
- archive: None (no archive is provided here)
Otherwise, a dictionary with the key error and the
associated failures, either:
- 400 (bad request) if the request is not providing an external
identifier
- 400 (bad request) if the request's body is empty
- 415 (unsupported media type) if a wrong media type is provided
"""
try:
raw_metadata, metadata = self._read_metadata(req.data)
except ParserError:
return make_error_dict(
BAD_REQUEST,
'Malformed xml metadata',
"The xml received is malformed. "
"Please ensure your metadata file is correctly formatted.")
if not metadata:
return make_error_dict(
BAD_REQUEST,
'Empty body request is not supported',
'Atom entry deposit is supposed to send for metadata. '
'If the body is empty, there is no metadata.')
external_id = metadata.get('external_identifier', headers['slug'])
deposit = self._deposit_put(req, deposit_id=deposit_id,
in_progress=headers['in-progress'],
external_id=external_id)
self._deposit_request_put(
deposit, {METADATA_KEY: metadata, RAW_METADATA_KEY: raw_metadata},
replace_metadata, replace_archives)
return {
'deposit_id': deposit.id,
'deposit_date': deposit.reception_date,
'archive': None,
'status': deposit.status,
}
def _empty_post(self, req, headers, collection_name, deposit_id):
"""Empty post to finalize an empty deposit.
Args:
req (Request): the request holding information to parse
and inject in db
headers (dict): request headers formatted
collection_name (str): the associated client
deposit_id (id): deposit identifier
Returns:
Dictionary of result with the deposit's id, the date
it was completed and no archive.
"""
deposit = Deposit.objects.get(pk=deposit_id)
deposit.complete_date = timezone.now()
deposit.status = DEPOSIT_STATUS_DEPOSITED
deposit.save()
return {
'deposit_id': deposit_id,
'deposit_date': deposit.complete_date,
'status': deposit.status,
'archive': None,
}
def _make_iris(self, req, collection_name, deposit_id):
"""Define the IRI endpoints
Args:
req (Request): The initial request
collection_name (str): client/collection's name
deposit_id (id): Deposit identifier
Returns:
Dictionary of keys with the iris' urls.
"""
args = [collection_name, deposit_id]
return {
iri: req.build_absolute_uri(reverse(iri, args=args))
for iri in [EM_IRI, EDIT_SE_IRI, CONT_FILE_IRI, STATE_IRI]
}
def additional_checks(self, req, headers, collection_name,
deposit_id=None):
"""Permit the child class to enrich additional checks.
Returns:
dict with 'error' detailing the problem.
"""
return {}
def checks(self, req, collection_name, deposit_id=None):
try:
self._collection = DepositCollection.objects.get(
name=collection_name)
except DepositCollection.DoesNotExist:
return make_error_dict(
NOT_FOUND,
'Unknown collection name %s' % collection_name)
username = req.user.username
if username: # unauthenticated request can have the username empty
try:
self._client = DepositClient.objects.get(username=username)
except DepositClient.DoesNotExist:
return make_error_dict(NOT_FOUND,
'Unknown client name %s' % username)
if self._collection.id not in self._client.collections:
return make_error_dict(
FORBIDDEN,
'Client %s cannot access collection %s' % (
username, collection_name))
if deposit_id:
try:
deposit = Deposit.objects.get(pk=deposit_id)
except Deposit.DoesNotExist:
return make_error_dict(
NOT_FOUND,
'Deposit with id %s does not exist' %
deposit_id)
checks = self.restrict_access(req, deposit)
if checks:
return checks
headers = self._read_headers(req)
if headers['on-behalf-of']:
return make_error_dict(MEDIATION_NOT_ALLOWED,
'Mediation is not supported.')
checks = self.additional_checks(req, headers,
collection_name, deposit_id)
if 'error' in checks:
return checks
return {'headers': headers}
def restrict_access(self, req, deposit=None):
if deposit:
if (req.method != 'GET' and
deposit.status != DEPOSIT_STATUS_PARTIAL):
summary = "You can only act on deposit with status '%s'" % (
DEPOSIT_STATUS_PARTIAL, )
description = "This deposit has status '%s'" % deposit.status
return make_error_dict(
BAD_REQUEST, summary=summary,
verbose_description=description)
def _basic_not_allowed_method(self, req, method):
return make_error_response(
req, METHOD_NOT_ALLOWED,
'%s method is not supported on this endpoint' % method)
def get(self, req, *args, **kwargs):
return self._basic_not_allowed_method(req, 'GET')
def post(self, req, *args, **kwargs):
return self._basic_not_allowed_method(req, 'POST')
def put(self, req, *args, **kwargs):
return self._basic_not_allowed_method(req, 'PUT')
def delete(self, req, *args, **kwargs):
return self._basic_not_allowed_method(req, 'DELETE')
class SWHGetDepositAPI(SWHBaseDeposit, metaclass=ABCMeta):
"""Mixin for class to support GET method.
"""
def get(self, req, collection_name, deposit_id, format=None):
"""Endpoint to create/add resources to deposit.
Returns:
200 response when no error during routine occurred
400 if the deposit does not belong to the collection
404 if the deposit or the collection does not exist
"""
checks = self.checks(req, collection_name, deposit_id)
if 'error' in checks:
return make_error_response_from_dict(req, checks['error'])
r = self.process_get(
req, collection_name, deposit_id)
if isinstance(r, tuple):
status, content, content_type = r
return HttpResponse(content,
status=status,
content_type=content_type)
return r
@abstractmethod
def process_get(self, req, collection_name, deposit_id):
"""Routine to deal with the deposit's get processing.
Returns:
Tuple status, stream of content, content-type
"""
pass
class SWHPostDepositAPI(SWHBaseDeposit, metaclass=ABCMeta):
"""Mixin for class to support DELETE method.
"""
def post(self, req, collection_name, deposit_id=None, format=None):
"""Endpoint to create/add resources to deposit.
Returns:
204 response when no error during routine occurred.
400 if the deposit does not belong to the collection
404 if the deposit or the collection does not exist
"""
checks = self.checks(req, collection_name, deposit_id)
if 'error' in checks:
return make_error_response_from_dict(req, checks['error'])
headers = checks['headers']
_status, _iri_key, data = self.process_post(
req, headers, collection_name, deposit_id)
error = data.get('error')
if error:
return make_error_response_from_dict(req, error)
data['packagings'] = ACCEPT_PACKAGINGS
iris = self._make_iris(req, collection_name, data['deposit_id'])
data.update(iris)
response = render(req, 'deposit/deposit_receipt.xml',
context=data,
content_type='application/xml',
status=_status)
response._headers['location'] = 'Location', data[_iri_key]
return response
@abstractmethod
def process_post(self, req, headers, collection_name, deposit_id=None):
"""Routine to deal with the deposit's processing.
Returns
Tuple of:
- response status code (200, 201, etc...)
- key iri (EM_IRI, EDIT_SE_IRI, etc...)
- dictionary of the processing result
"""
pass
class SWHPutDepositAPI(SWHBaseDeposit, metaclass=ABCMeta):
"""Mixin for class to support PUT method.
"""
def put(self, req, collection_name, deposit_id, format=None):
"""Endpoint to update deposit resources.
Returns:
204 response when no error during routine occurred.
400 if the deposit does not belong to the collection
404 if the deposit or the collection does not exist
"""
checks = self.checks(req, collection_name, deposit_id)
if 'error' in checks:
return make_error_response_from_dict(req, checks['error'])
headers = checks['headers']
data = self.process_put(req, headers, collection_name, deposit_id)
error = data.get('error')
if error:
return make_error_response_from_dict(req, error)
return HttpResponse(status=status.HTTP_204_NO_CONTENT)
@abstractmethod
def process_put(self, req, headers, collection_name, deposit_id):
"""Routine to deal with updating a deposit in some way.
Returns
dictionary of the processing result
"""
pass
class SWHDeleteDepositAPI(SWHBaseDeposit, metaclass=ABCMeta):
"""Mixin for class to support DELETE method.
"""
def delete(self, req, collection_name, deposit_id):
"""Endpoint to delete some deposit's resources (archives, deposit).
Returns:
204 response when no error during routine occurred.
400 if the deposit does not belong to the collection
404 if the deposit or the collection does not exist
"""
checks = self.checks(req, collection_name, deposit_id)
if 'error' in checks:
return make_error_response_from_dict(req, checks['error'])
data = self.process_delete(req, collection_name, deposit_id)
error = data.get('error')
if error:
return make_error_response_from_dict(req, error)
return HttpResponse(status=status.HTTP_204_NO_CONTENT)
@abstractmethod
def process_delete(self, req, collection_name, deposit_id):
"""Routine to delete a resource.
This is mostly not allowed except for the
EM_IRI (cf. .api.deposit_update.SWHUpdateArchiveDeposit)
"""
pass
diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py
index d5dfe69f..dfced699 100644
--- a/swh/deposit/tests/api/test_deposit.py
+++ b/swh/deposit/tests/api/test_deposit.py
@@ -1,189 +1,189 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
from django.urls import reverse
from io import BytesIO
from rest_framework import status
from swh.deposit.config import (
COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED,
DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS,
DEPOSIT_STATUS_LOAD_FAILURE
)
from swh.deposit.models import Deposit
from swh.deposit.parsers import parse_xml
def test_deposit_post_will_fail_with_401(client):
"""Without authentication, endpoint refuses access with 401 response
"""
url = reverse(COL_IRI, args=['hal'])
response = client.post(url)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_access_to_another_user_collection_is_forbidden(
authenticated_client, deposit_another_collection, deposit_user):
"""Access to another user collection should return a 403
"""
coll2 = deposit_another_collection
url = reverse(COL_IRI, args=[coll2.name])
response = authenticated_client.post(url)
assert response.status_code == status.HTTP_403_FORBIDDEN
msg = 'Client %s cannot access collection %s' % (
deposit_user.username, coll2.name, )
assert msg in response.content.decode('utf-8')
def test_delete_on_col_iri_not_supported(
authenticated_client, deposit_collection):
"""Delete on col iri should return a 405 response
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
response = authenticated_client.delete(url)
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
assert 'DELETE method is not supported on this endpoint' in \
response.content.decode('utf-8')
def create_deposit_with_rejection_status(
authenticated_client, deposit_collection):
url = reverse(COL_IRI, args=[deposit_collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
actual_state = response_content['deposit_status']
assert actual_state == DEPOSIT_STATUS_REJECTED
def test_act_on_deposit_rejected_is_not_permitted(
authenticated_client, deposit_collection, rejected_deposit,
atom_dataset):
deposit = rejected_deposit
response = authenticated_client.post(
reverse(EDIT_SE_IRI, args=[deposit.collection.name, deposit.id]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'],
HTTP_SLUG=deposit.external_id)
assert response.status_code == status.HTTP_400_BAD_REQUEST
msg = 'You can only act on deposit with status '%s'' % (
DEPOSIT_STATUS_PARTIAL, )
assert msg in response.content.decode('utf-8')
def test_add_deposit_when_partial_makes_new_deposit(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""Posting deposit on collection when previous is partial makes new deposit
"""
deposit = partial_deposit
assert deposit.status == DEPOSIT_STATUS_PARTIAL
# adding a new deposit with the same external id
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
- data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'),
+ data=atom_dataset['entry-data0'] % deposit.external_id,
HTTP_SLUG=deposit.external_id
)
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
assert deposit_id != deposit.id # new deposit
new_deposit = Deposit.objects.get(pk=deposit_id)
assert new_deposit != deposit
assert new_deposit.parent is None
def test_add_deposit_when_failed_makes_new_deposit_with_no_parent(
authenticated_client, deposit_collection, failed_deposit,
atom_dataset):
"""Posting deposit on collection when deposit done makes new deposit with
parent
"""
deposit = failed_deposit
assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE
# adding a new deposit with the same external id as a completed deposit
# creates the parenting chain
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
- data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'),
+ data=atom_dataset['entry-data0'] % deposit.external_id,
HTTP_SLUG=deposit.external_id)
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
assert deposit_id != deposit.id
new_deposit = Deposit.objects.get(pk=deposit_id)
assert new_deposit != deposit
assert new_deposit.parent is None
def test_add_deposit_when_done_makes_new_deposit_with_parent_old_one(
authenticated_client, deposit_collection, completed_deposit,
atom_dataset):
"""Posting deposit on collection when deposit done makes new deposit with
parent
"""
# given multiple deposit already loaded
deposit = completed_deposit
assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS
# adding a new deposit with the same external id as a completed deposit
# creates the parenting chain
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
- data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'),
+ data=atom_dataset['entry-data0'] % deposit.external_id,
HTTP_SLUG=deposit.external_id
)
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
assert deposit_id != deposit.id
new_deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == new_deposit.collection
assert deposit.external_id == new_deposit.external_id
assert new_deposit != deposit
assert new_deposit.parent == deposit
diff --git a/swh/deposit/tests/api/test_deposit_atom.py b/swh/deposit/tests/api/test_deposit_atom.py
index a8fcc532..7869133c 100644
--- a/swh/deposit/tests/api/test_deposit_atom.py
+++ b/swh/deposit/tests/api/test_deposit_atom.py
@@ -1,313 +1,312 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from django.urls import reverse
from io import BytesIO
from rest_framework import status
from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest, DepositCollection
from swh.deposit.parsers import parse_xml
def test_post_deposit_atom_201_even_with_decimal(
authenticated_client, deposit_collection, atom_dataset):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
atom_error_with_decimal = atom_dataset['error-with-decimal']
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_error_with_decimal,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
dr = DepositRequest.objects.get(deposit=deposit)
assert dr.metadata is not None
sw_version = dr.metadata.get('codemeta:softwareVersion')
assert sw_version == '10.4'
def test_post_deposit_atom_400_with_empty_body(
authenticated_client, deposit_collection, atom_dataset):
"""Posting empty body request should return a 400 response
"""
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-empty-body'])
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_atom_400_badly_formatted_atom(
authenticated_client, deposit_collection, atom_dataset):
"""Posting a badly formatted atom should return a 400 response
"""
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-badly-formatted'])
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_atom_parsing_error(
authenticated_client, deposit_collection, atom_dataset):
"""Posting parsing error prone atom should return 400
"""
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-parsing-error-prone'])
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_atom_no_slug_header(
authenticated_client, deposit_collection, atom_dataset):
"""Posting an atom entry without a slug header should return a 400
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
# when
response = authenticated_client.post(
url,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data0'],
# + headers
HTTP_IN_PROGRESS='false')
assert b'Missing SLUG header' in response.content
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_atom_unknown_collection(
authenticated_client, atom_dataset):
"""Posting an atom entry to an unknown collection should return a 404
"""
unknown_collection = 'unknown-one'
with pytest.raises(DepositCollection.DoesNotExist):
DepositCollection.objects.get(name=unknown_collection)
response = authenticated_client.post(
reverse(COL_IRI, args=[unknown_collection]), # <- unknown collection
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data0'],
HTTP_SLUG='something')
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_post_deposit_atom_entry_initial(
authenticated_client, deposit_collection, atom_dataset):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
- atom_entry_data = atom_dataset['entry-data0'] % external_id.encode('utf-8')
+ atom_entry_data = atom_dataset['entry-data0'] % external_id
# when
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == deposit_collection
assert deposit.external_id == external_id
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.metadata is not None
- assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8')
+ assert deposit_request.raw_metadata == atom_entry_data
assert bool(deposit_request.archive) is False
def test_post_deposit_atom_entry_with_codemeta(
authenticated_client, deposit_collection, atom_dataset):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
- atom_entry_data = atom_dataset['codemeta-sample'] % external_id.encode('utf-8') # noqa
-
+ atom_entry_data = atom_dataset['codemeta-sample'] % external_id
# when
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == deposit_collection
assert deposit.external_id == external_id
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.metadata is not None
- assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8')
+ assert deposit_request.raw_metadata == atom_entry_data
assert bool(deposit_request.archive) is False
def test_post_deposit_atom_entry_tei(
authenticated_client, deposit_collection, atom_dataset):
"""Posting initial atom entry as TEI should return 201 with receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
atom_entry_data = atom_dataset['tei-sample']
# when
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == deposit_collection
assert deposit.external_id == external_id
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.metadata is not None
- assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8')
+ assert deposit_request.raw_metadata == atom_entry_data
assert bool(deposit_request.archive) is False
def test_post_deposit_atom_entry_multiple_steps(
authenticated_client, deposit_collection, atom_dataset):
"""After initial deposit, updating a deposit should return a 201
"""
# given
external_id = 'urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a'
with pytest.raises(Deposit.DoesNotExist):
deposit = Deposit.objects.get(external_id=external_id)
# when
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'],
HTTP_IN_PROGRESS='True',
HTTP_SLUG=external_id)
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == deposit_collection
assert deposit.external_id == external_id
assert deposit.status == 'partial'
# one associated request to a deposit
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert len(deposit_requests) == 1
atom_entry_data = atom_dataset['entry-data-minimal'] % external_id.encode('utf-8') # noqa
update_uri = response._headers['location'][1]
# when updating the first deposit post
response = authenticated_client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_IN_PROGRESS='False')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.collection == deposit_collection
assert deposit.external_id == external_id
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert len(Deposit.objects.all()) == 1
# now 2 associated requests to a same deposit
deposit_requests = DepositRequest.objects.filter(
deposit=deposit).order_by('id')
assert len(deposit_requests) == 2
atom_entry_data1 = atom_dataset['entry-data1']
expected_meta = [
{
'metadata': parse_xml(atom_entry_data1),
- 'raw_metadata': atom_entry_data1.decode('utf-8'),
+ 'raw_metadata': atom_entry_data1
},
{
'metadata': parse_xml(atom_entry_data),
- 'raw_metadata': atom_entry_data.decode('utf-8'),
+ 'raw_metadata': atom_entry_data
}
]
for i, deposit_request in enumerate(deposit_requests):
actual_metadata = deposit_request.metadata
assert actual_metadata == expected_meta[i]['metadata']
assert deposit_request.raw_metadata == expected_meta[i]['raw_metadata']
assert bool(deposit_request.archive) is False
diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py
index 8f1cc763..5bcca36a 100644
--- a/swh/deposit/tests/api/test_deposit_binary.py
+++ b/swh/deposit/tests/api/test_deposit_binary.py
@@ -1,543 +1,543 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.urls import reverse
from io import BytesIO
from rest_framework import status
from swh.deposit.config import (
COL_IRI, EM_IRI, DEPOSIT_STATUS_DEPOSITED,
)
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from swh.deposit.tests.common import create_arborescence_archive, check_archive
def test_post_deposit_binary_no_slug(
authenticated_client, deposit_collection, sample_archive):
"""Posting a binary deposit without slug header should return 400
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert b'Missing SLUG header' in response.content
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_binary_support(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with content-type not in [zip,x-tar] should return 415
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/octet-stream',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_ok(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with correct headers should return 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
sample_archive['name'], ))
# then
response_content = parse_xml(BytesIO(response.content))
assert response.status_code == status.HTTP_201_CREATED
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
check_archive(sample_archive['name'], deposit_request.archive.name)
assert deposit_request.metadata is None
assert deposit_request.raw_metadata is None
response_content = parse_xml(BytesIO(response.content))
assert response_content['deposit_archive'] == sample_archive['name']
assert int(response_content['deposit_id']) == deposit.id
assert response_content['deposit_status'] == deposit.status
edit_se_iri = reverse('edit_se_iri',
args=[deposit_collection.name, deposit.id])
assert response._headers['location'] == (
'Location', 'http://testserver' + edit_se_iri)
def test_post_deposit_binary_failure_unsupported_packaging_header(
authenticated_client, deposit_collection, sample_archive):
"""Bin deposit without supported content_disposition header returns 400
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='something-unsupported',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_400_BAD_REQUEST
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_no_content_disposition_header(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload without content_disposition header should return 400
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false')
# then
assert response.status_code == status.HTTP_400_BAD_REQUEST
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_mediation_not_supported(
authenticated_client, deposit_collection, sample_archive):
"""Binary upload with mediation should return a 412 response
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_ON_BEHALF_OF='someone',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded(
authenticated_client, deposit_collection, sample_archive, tmp_path):
"""Binary upload must not exceed the limit set up...
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
archive = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some content in file',
up_to_size=500)
external_id = 'some-external-id'
# when
response = authenticated_client.post(
url,
content_type='application/zip',
data=archive['data'],
# + headers
CONTENT_LENGTH=archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE
assert b'Upload size limit exceeded' in response.content
with pytest.raises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_2_post_2_different_deposits(
authenticated_client, deposit_collection, sample_archive):
"""2 posting deposits should return 2 different 201 with receipt
"""
url = reverse(COL_IRI, args=[deposit_collection.name])
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG='some-external-id-1',
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
deposits = Deposit.objects.all()
assert len(deposits) == 1
assert deposits[0] == deposit
# second post
response = authenticated_client.post(
url,
content_type='application/x-tar', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG='another-external-id',
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id2 = response_content['deposit_id']
deposit2 = Deposit.objects.get(pk=deposit_id2)
assert deposit != deposit2
deposits = Deposit.objects.all().order_by('id')
assert len(deposits) == 2
assert list(deposits), [deposit == deposit2]
def test_post_deposit_binary_and_post_to_add_another_archive(
authenticated_client, deposit_collection, sample_archive, tmp_path):
"""Updating a deposit should return a 201 with receipt
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='true',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
sample_archive['name'], ))
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == 'partial'
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.deposit == deposit
assert deposit_request.type == 'archive'
check_archive(sample_archive['name'], deposit_request.archive.name)
# 2nd archive to upload
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some other content in file')
# uri to update the content
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id])
# adding another archive for the deposit and finalizing it
response = authenticated_client.post(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name']))
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit).
order_by('id'))
# 2 deposit requests for the same deposit
assert len(deposit_requests) == 2
assert deposit_requests[0].deposit == deposit
assert deposit_requests[0].type == 'archive'
check_archive(sample_archive['name'], deposit_requests[0].archive.name)
assert deposit_requests[1].deposit == deposit
assert deposit_requests[1].type == 'archive'
check_archive(archive2['name'], deposit_requests[1].archive.name)
# only 1 deposit in db
deposits = Deposit.objects.all()
assert len(deposits) == 1
def test_post_deposit_then_update_refused(
authenticated_client, deposit_collection,
sample_archive, atom_dataset, tmp_path):
"""Updating a deposit with status 'ready' should return a 400
"""
tmp_path = str(tmp_path)
url = reverse(COL_IRI, args=[deposit_collection.name])
external_id = 'some-external-id-1'
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_request = DepositRequest.objects.get(deposit=deposit)
assert deposit_request.deposit == deposit
check_archive('filename0', deposit_request.archive.name)
# updating/adding is forbidden
# uri to update the content
edit_se_iri = reverse(
'edit_se_iri', args=[deposit_collection.name, deposit_id])
em_iri = reverse(
'em_iri', args=[deposit_collection.name, deposit_id])
# Testing all update/add endpoint should fail
# since the status is ready
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some content in file 2')
# replacing file is no longer possible since the deposit's
# status is ready
r = authenticated_client.put(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding file is no longer possible since the deposit's status
# is ready
r = authenticated_client.post(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
assert r.status_code == status.HTTP_400_BAD_REQUEST
# replacing metadata is no longer possible since the deposit's
# status is ready
r = authenticated_client.put(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-deposit-binary'],
CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']),
HTTP_SLUG=external_id)
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding new metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.post(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-deposit-binary'],
CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']),
HTTP_SLUG=external_id)
assert r.status_code == status.HTTP_400_BAD_REQUEST
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
- BytesIO(atom_dataset['entry-data-deposit-binary']),
+ BytesIO(atom_dataset['entry-data-deposit-binary'].encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(atom_dataset['entry-data-deposit-binary']),
charset='utf-8')
# replacing multipart metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.put(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
assert r.status_code == status.HTTP_400_BAD_REQUEST
# adding new metadata is no longer possible since the
# deposit's status is ready
r = authenticated_client.post(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
assert r.status_code == status.HTTP_400_BAD_REQUEST
diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py
index d9420f8d..ac73597c 100644
--- a/swh/deposit/tests/api/test_deposit_multipart.py
+++ b/swh/deposit/tests/api/test_deposit_multipart.py
@@ -1,389 +1,391 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.urls import reverse
from io import BytesIO
from rest_framework import status
from swh.deposit.config import (
COL_IRI, DEPOSIT_STATUS_DEPOSITED
)
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from swh.deposit.tests.common import check_archive
def test_post_deposit_multipart_without_slug_header_is_bad_request(
authenticated_client, deposit_collection, atom_dataset):
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
data_atom_entry = atom_dataset['entry-data-deposit-binary']
atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
+ BytesIO(data_atom_entry.encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false')
assert b'Missing SLUG header' in response.content
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_post_deposit_multipart_zip(
authenticated_client, deposit_collection,
atom_dataset, sample_archive):
"""one multipart deposit (zip+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
archive = InMemoryUploadedFile(
BytesIO(sample_archive['data']),
field_name=sample_archive['name'],
name=sample_archive['name'],
content_type='application/zip',
size=sample_archive['length'],
charset=None)
data_atom_entry = atom_dataset['entry-data-deposit-binary']
atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
+ BytesIO(data_atom_entry.encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert len(deposit_requests) == 2
for deposit_request in deposit_requests:
assert deposit_request.deposit == deposit
if deposit_request.type == 'archive':
check_archive(sample_archive['name'], deposit_request.archive.name)
assert deposit_request.metadata is None
assert deposit_request.raw_metadata is None
else:
assert deposit_request.metadata['id'] == \
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
assert deposit_request.raw_metadata == \
- data_atom_entry.decode('utf-8')
+ data_atom_entry
def test_post_deposit_multipart_tar(
authenticated_client, deposit_collection,
atom_dataset, sample_archive):
"""one multipart deposit (tar+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
# from django.core.files import uploadedfile
data_atom_entry = atom_dataset['entry-data-deposit-binary']
archive = InMemoryUploadedFile(
BytesIO(sample_archive['data']),
field_name=sample_archive['name'],
name=sample_archive['name'],
content_type='application/x-tar',
size=sample_archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
+ BytesIO(data_atom_entry.encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert len(deposit_requests) == 2
for deposit_request in deposit_requests:
assert deposit_request.deposit == deposit
if deposit_request.type == 'archive':
check_archive(sample_archive['name'], deposit_request.archive.name)
assert deposit_request.metadata is None
assert deposit_request.raw_metadata is None
else:
assert deposit_request.metadata['id'] == \
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
assert deposit_request.raw_metadata == \
- data_atom_entry.decode('utf-8')
+ data_atom_entry
def test_post_deposit_multipart_put_to_replace_metadata(
authenticated_client, deposit_collection,
atom_dataset, sample_archive):
"""One multipart deposit followed by a metadata update should be
accepted
"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
data_atom_entry = atom_dataset['entry-data-deposit-binary']
archive = InMemoryUploadedFile(
BytesIO(sample_archive['data']),
field_name=sample_archive['name'],
name=sample_archive['name'],
content_type='application/zip',
size=sample_archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry),
+ BytesIO(data_atom_entry.encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='true',
HTTP_SLUG=external_id)
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == 'partial'
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert len(deposit_requests) == 2
for deposit_request in deposit_requests:
assert deposit_request.deposit == deposit
if deposit_request.type == 'archive':
check_archive(sample_archive['name'], deposit_request.archive.name)
else:
assert deposit_request.metadata['id'] == \
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
assert deposit_request.raw_metadata == \
- data_atom_entry.decode('utf-8')
+ data_atom_entry
replace_metadata_uri = response._headers['location'][1]
response = authenticated_client.put(
replace_metadata_uri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data-deposit-binary'],
HTTP_IN_PROGRESS='false')
assert response.status_code == status.HTTP_204_NO_CONTENT
# deposit_id did not change
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_DEPOSITED
assert deposit.external_id == external_id
assert deposit.collection == deposit_collection
assert deposit.swh_id is None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert len(deposit_requests) == 2
for deposit_request in deposit_requests:
assert deposit_request.deposit == deposit
if deposit_request.type == 'archive':
check_archive(sample_archive['name'], deposit_request.archive.name)
else:
assert deposit_request.metadata['id'] == \
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
assert deposit_request.raw_metadata == \
- atom_dataset['entry-data-deposit-binary'].decode('utf-8')
+ atom_dataset['entry-data-deposit-binary']
# FAILURE scenarios
def test_post_deposit_multipart_only_archive_and_atom_entry(
authenticated_client, deposit_collection):
"""Multipart deposit only accepts one archive and one atom+xml"""
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
archive_content = b'some content representing archive'
- archive = InMemoryUploadedFile(BytesIO(archive_content),
- field_name='archive0',
- name='archive0',
- content_type='application/x-tar',
- size=len(archive_content),
- charset=None)
+ archive = InMemoryUploadedFile(
+ BytesIO(archive_content),
+ field_name='archive0',
+ name='archive0',
+ content_type='application/x-tar',
+ size=len(archive_content),
+ charset=None)
other_archive_content = b"some-other-content"
- other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
- field_name='atom0',
- name='atom0',
- content_type='application/x-tar',
- size=len(other_archive_content),
- charset='utf-8')
+ other_archive = InMemoryUploadedFile(
+ BytesIO(other_archive_content),
+ field_name='atom0',
+ name='atom0',
+ content_type='application/x-tar',
+ size=len(other_archive_content),
+ charset='utf-8')
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': other_archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
assert 'Only 1 application/zip (or application/x-tar) archive' in \
response.content.decode('utf-8')
# when
archive.seek(0)
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
assert (
'You must provide both 1 application/zip (or '
'application/x-tar) and 1 atom+xml entry for '
'multipart deposit' in response.content.decode('utf-8')
) is True
def test_post_deposit_multipart_400_when_badly_formatted_xml(
authenticated_client, deposit_collection,
sample_archive, atom_dataset):
# given
url = reverse(COL_IRI, args=[deposit_collection.name])
archive_content = sample_archive['data']
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name=sample_archive['name'],
name=sample_archive['name'],
content_type='application/zip',
size=len(archive_content),
charset=None)
data_atom_entry_ko = atom_dataset['entry-data-ko']
atom_entry = InMemoryUploadedFile(
- BytesIO(data_atom_entry_ko),
+ BytesIO(data_atom_entry_ko.encode('utf-8')),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry_ko),
charset='utf-8')
# when
response = authenticated_client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id',
)
assert b'Malformed xml metadata' in response.content
assert response.status_code == status.HTTP_400_BAD_REQUEST
diff --git a/swh/deposit/tests/api/test_deposit_private_read_metadata.py b/swh/deposit/tests/api/test_deposit_private_read_metadata.py
index c1e4ae02..3738eebf 100644
--- a/swh/deposit/tests/api/test_deposit_private_read_metadata.py
+++ b/swh/deposit/tests/api/test_deposit_private_read_metadata.py
@@ -1,601 +1,601 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.urls import reverse
from rest_framework import status
from swh.deposit.models import Deposit
from swh.deposit.config import (
PRIVATE_GET_DEPOSIT_METADATA, SWH_PERSON, EDIT_SE_IRI
)
PRIVATE_GET_DEPOSIT_METADATA_NC = PRIVATE_GET_DEPOSIT_METADATA + '-nc'
def private_get_raw_url_endpoints(collection, deposit):
"""There are 2 endpoints to check (one with collection, one without)"""
deposit_id = deposit if isinstance(deposit, int) else deposit.id
return [
reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[collection.name, deposit_id]),
reverse(PRIVATE_GET_DEPOSIT_METADATA_NC,
args=[deposit_id])
]
def update_deposit(authenticated_client, collection, deposit, atom_dataset):
for atom_data in ['entry-data2', 'entry-data3']:
update_deposit_with_metadata(
authenticated_client, collection, deposit, atom_dataset[atom_data]
)
return deposit
def update_deposit_with_metadata(authenticated_client, collection, deposit,
metadata):
# update deposit's metadata
response = authenticated_client.post(
reverse(EDIT_SE_IRI, args=[collection.name, deposit.id]),
content_type='application/atom+xml;type=entry',
data=metadata,
HTTP_SLUG=deposit.external_id,
HTTP_IN_PROGRESS=True)
assert response.status_code == status.HTTP_201_CREATED
return deposit
def test_read_metadata(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""Private metadata read api to existing deposit should return metadata
"""
deposit = partial_deposit
deposit.external_id = 'some-external-id'
deposit.save()
deposit = update_deposit(authenticated_client, deposit_collection, deposit,
atom_dataset)
for url in private_get_raw_url_endpoints(deposit_collection, deposit):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response._headers['content-type'][1] == 'application/json'
data = response.json()
expected_meta = {
'branch_name': 'master',
'origin': {
'type': 'deposit',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id'
},
'origin_metadata': {
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': [
'some awesome author',
'another one',
'no one'
],
'codemeta:dateCreated': '2017-10-07T15:17:08Z',
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa
},
'provider': {
'metadata': {},
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/'
},
'tool': {
'configuration': {'sword_version': '2'},
'name': 'swh-deposit',
'version': '0.0.1'
}
},
'revision': {
'author': SWH_PERSON,
'committer': SWH_PERSON,
'committer_date': {
'negative_utc': False,
'offset': 0,
'timestamp': {
'microseconds': 0,
'seconds': 1507389428
}
},
'date': {
'negative_utc': False,
'offset': 0,
'timestamp': {'microseconds': 0, 'seconds': 1507389428}
},
'message': 'test: Deposit %s in collection test' % deposit.id,
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author',
'another one',
'no one'],
'codemeta:dateCreated': '2017-10-07T15:17:08Z',
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa
},
'synthetic': True,
'type': 'tar'
}
}
assert data == expected_meta
def test_read_metadata_revision_with_parent(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""Private read metadata to a deposit (with parent) returns metadata
"""
deposit = partial_deposit
deposit.external_id = 'some-external-id'
deposit.save()
deposit = update_deposit(authenticated_client, deposit_collection, deposit,
atom_dataset)
rev_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa'
swh_id = 'swh:1:rev:%s' % rev_id
fake_parent = Deposit(swh_id=swh_id,
client=deposit.client, collection=deposit.collection)
fake_parent.save()
deposit.parent = fake_parent
deposit.save()
for url in private_get_raw_url_endpoints(deposit_collection, deposit):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response._headers['content-type'][1] == 'application/json'
data = response.json()
expected_meta = {
'branch_name': 'master',
'origin': {
'type': 'deposit',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id'
},
'origin_metadata': {
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': [
'some awesome author',
'another one',
'no one'
],
'codemeta:dateCreated': '2017-10-07T15:17:08Z',
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa
},
'provider': {
'metadata': {},
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/'
},
'tool': {
'configuration': {'sword_version': '2'},
'name': 'swh-deposit',
'version': '0.0.1'
}
},
'revision': {
'author': SWH_PERSON,
'committer': SWH_PERSON,
'committer_date': {
'negative_utc': False,
'offset': 0,
'timestamp': {
'microseconds': 0,
'seconds': 1507389428
}
},
'date': {
'negative_utc': False,
'offset': 0,
'timestamp': {'microseconds': 0, 'seconds': 1507389428}
},
'message': 'test: Deposit %s in collection test' % deposit.id,
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author',
'another one',
'no one'],
'codemeta:dateCreated': '2017-10-07T15:17:08Z',
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa
},
'synthetic': True,
'type': 'tar',
'parents': [rev_id],
}
}
assert data == expected_meta
def test_read_metadata_3(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""date(Created|Published) provided, uses author/committer date
"""
deposit = partial_deposit
deposit.external_id = 'hal-01243065'
deposit.save()
deposit = update_deposit(
authenticated_client, deposit_collection, deposit,
atom_dataset)
# add metadata to the deposit with datePublished and dateCreated
- codemeta_entry_data = atom_dataset['metadata'] % b"""
+ codemeta_entry_data = atom_dataset['metadata'] % """
2015-04-06T17:08:47+02:00
2017-05-03T16:08:47+02:00
"""
update_deposit_with_metadata(
authenticated_client, deposit_collection, deposit,
codemeta_entry_data
)
for url in private_get_raw_url_endpoints(deposit_collection, deposit):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response._headers['content-type'][1] == 'application/json'
data = response.json()
metadata = {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0',
'author': [
'some awesome author',
'another one',
'no one',
{
'email': 'hal@ccsd.cnrs.fr',
'name': 'HAL'
}
],
'client': 'hal',
'codemeta:applicationCategory': 'test',
'codemeta:author': {
'codemeta:name': 'Morane Gruenpeter'
},
'codemeta:dateCreated': ['2017-10-07T15:17:08Z',
'2015-04-06T17:08:47+02:00'],
'codemeta:datePublished': '2017-05-03T16:08:47+02:00',
'codemeta:description': 'this is the description',
'codemeta:developmentStatus': 'stable',
'codemeta:keywords': 'DSP programming',
'codemeta:license': [
{'codemeta:name': 'GNU General Public License v3.0 only'},
{'codemeta:name': 'CeCILL '
'Free '
'Software '
'License '
'Agreement '
'v1.1'}],
'codemeta:programmingLanguage': [
'php',
'python',
'C'
],
'codemeta:runtimePlatform': 'phpstorm',
'codemeta:url': 'https://hal-test.archives-ouvertes.fr/hal-01243065', # noqa
'codemeta:version': '1',
'external_identifier': [
'some-external-id',
'hal-01243065'
],
'id': 'hal-01243065',
'title': 'Composing a Web of Audio '
'Applications',
'url': 'https://hal-test.archives-ouvertes.fr/some-external-id'
}
expected_meta = {
'branch_name': 'master',
'origin': {
'type': 'deposit',
'url': 'https://hal-test.archives-ouvertes.fr/hal-01243065'
},
'origin_metadata': {
'metadata': metadata,
'provider': {
'metadata': {},
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/'
},
'tool': {
'configuration': {'sword_version': '2'},
'name': 'swh-deposit',
'version': '0.0.1'
}
},
'revision': {
'author': SWH_PERSON,
'committer': SWH_PERSON,
'committer_date': {'negative_utc': False,
'offset': 120,
'timestamp': {'microseconds': 0,
'seconds': 1493820527}},
'date': {
'negative_utc': False,
'offset': 0,
'timestamp': {'microseconds': 0, 'seconds': 1507389428}
},
'message': '%s: Deposit %s in collection %s' % (
deposit_collection.name,
deposit.id,
deposit_collection.name
),
'metadata': metadata,
'synthetic': True,
'type': 'tar'
}
}
assert data == expected_meta
def test_read_metadata_4(
authenticated_client, deposit_collection, atom_dataset,
partial_deposit):
"""dateCreated/datePublished not provided, revision uses complete_date
"""
deposit = partial_deposit
- codemeta_entry_data = atom_dataset['metadata'] % b''
+ codemeta_entry_data = atom_dataset['metadata'] % ''
deposit = update_deposit_with_metadata(
authenticated_client, deposit_collection, deposit,
codemeta_entry_data)
# will use the deposit completed date as fallback date
deposit.complete_date = '2016-04-06'
deposit.save()
for url in private_get_raw_url_endpoints(deposit_collection, deposit):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response._headers['content-type'][1] == 'application/json'
data = response.json()
metadata = {
'@xmlns': 'http://www.w3.org/2005/Atom',
'@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0',
'author': {'email': 'hal@ccsd.cnrs.fr',
'name': 'HAL'},
'client': 'hal',
'codemeta:applicationCategory': 'test',
'codemeta:author': {'codemeta:name': 'Morane '
'Gruenpeter'},
'codemeta:description': 'this is the '
'description',
'codemeta:developmentStatus': 'stable',
'codemeta:keywords': 'DSP programming',
'codemeta:license': [{'codemeta:name': 'GNU '
'General '
'Public '
'License '
'v3.0 '
'only'},
{'codemeta:name': 'CeCILL '
'Free '
'Software '
'License '
'Agreement '
'v1.1'}],
'codemeta:programmingLanguage': ['php',
'python',
'C'],
'codemeta:runtimePlatform': 'phpstorm',
'codemeta:url':
'https://hal-test.archives-ouvertes.fr/hal-01243065',
'codemeta:version': '1',
'external_identifier': 'hal-01243065',
'id': 'hal-01243065',
'title': 'Composing a Web of Audio '
'Applications'
}
expected_origin = {
'type': 'deposit',
'url': 'https://hal-test.archives-ouvertes.fr/%s' % (
deposit.external_id)
}
expected_origin_metadata = {
'metadata': metadata,
'provider': {
'metadata': {},
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/'
},
'tool': {
'configuration': {'sword_version': '2'},
'name': 'swh-deposit',
'version': '0.0.1'
}
}
expected_revision = {
'author': {'email': 'robot@softwareheritage.org',
'fullname': 'Software Heritage',
'name': 'Software Heritage'},
'committer': {'email': 'robot@softwareheritage.org',
'fullname': 'Software Heritage',
'name': 'Software Heritage'},
'committer_date': {'negative_utc': False,
'offset': 0,
'timestamp': {'microseconds': 0,
'seconds': 1459900800}},
'date': {
'negative_utc': False,
'offset': 0,
'timestamp': {'microseconds': 0, 'seconds': 1459900800}},
'message': '%s: Deposit %s in collection %s' % (
deposit_collection.name, deposit.id, deposit_collection.name
),
'metadata': metadata,
'synthetic': True,
'type': 'tar'
}
expected_meta = {
'branch_name': 'master',
'origin': expected_origin,
'origin_metadata': expected_origin_metadata,
'revision': expected_revision,
}
assert data == expected_meta
def test_read_metadata_5(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""dateCreated/datePublished provided, revision uses author/committer
date
If multiple dateCreated provided, the first occurrence (of
dateCreated) is selected. If multiple datePublished provided,
the first occurrence (of datePublished) is selected.
"""
deposit = partial_deposit
# add metadata to the deposit with multiple datePublished/dateCreated
- codemeta_entry_data = atom_dataset['metadata'] % b"""
+ codemeta_entry_data = atom_dataset['metadata'] % """
2015-04-06T17:08:47+02:00
2017-05-03T16:08:47+02:00
2016-04-06T17:08:47+02:00
2018-05-03T16:08:47+02:00
"""
deposit = update_deposit_with_metadata(
authenticated_client, deposit_collection, deposit,
codemeta_entry_data)
for url in private_get_raw_url_endpoints(deposit_collection, deposit):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response._headers['content-type'][1] == 'application/json'
data = response.json()
expected_origin = {
'type': 'deposit',
'url': 'https://hal-test.archives-ouvertes.fr/external-id-partial'
}
metadata = {
'@xmlns': 'http://www.w3.org/2005/Atom',
'@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0',
'author': {'email': 'hal@ccsd.cnrs.fr',
'name': 'HAL'},
'client': 'hal',
'codemeta:applicationCategory': 'test',
'codemeta:author': {'codemeta:name': 'Morane '
'Gruenpeter'},
'codemeta:dateCreated': ['2015-04-06T17:08:47+02:00',
'2016-04-06T17:08:47+02:00'],
'codemeta:datePublished': ['2017-05-03T16:08:47+02:00',
'2018-05-03T16:08:47+02:00'],
'codemeta:description': 'this is the description',
'codemeta:developmentStatus': 'stable',
'codemeta:keywords': 'DSP programming',
'codemeta:license': [
{
'codemeta:name': 'GNU '
'General '
'Public '
'License '
'v3.0 '
'only'},
{
'codemeta:name': 'CeCILL '
'Free '
'Software '
'License '
'Agreement '
'v1.1'
}
],
'codemeta:programmingLanguage': ['php',
'python',
'C'],
'codemeta:runtimePlatform': 'phpstorm',
'codemeta:url': 'https://hal-test.archives-ouvertes.fr/hal-01243065', # noqa
'codemeta:version': '1',
'external_identifier': 'hal-01243065',
'id': 'hal-01243065',
'title': 'Composing a Web of Audio '
'Applications'
}
expected_origin_metadata = {
'metadata': metadata,
'provider': {
'metadata': {},
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/'},
'tool': {
'configuration': {'sword_version': '2'},
'name': 'swh-deposit',
'version': '0.0.1'
}
}
expected_revision = {
'author': {'email': 'robot@softwareheritage.org',
'fullname': 'Software Heritage',
'name': 'Software Heritage'},
'committer': {'email': 'robot@softwareheritage.org',
'fullname': 'Software Heritage',
'name': 'Software Heritage'},
'committer_date': {'negative_utc': False,
'offset': 120,
'timestamp': {'microseconds': 0,
'seconds': 1493820527}},
'date': {'negative_utc': False,
'offset': 120,
'timestamp': {'microseconds': 0, 'seconds': 1428332927}},
'message': '%s: Deposit %s in collection %s' % (
deposit_collection.name, deposit.id, deposit_collection.name
),
'metadata': metadata,
'synthetic': True,
'type': 'tar'
}
expected_meta = {
'branch_name': 'master',
'origin': expected_origin,
'origin_metadata': expected_origin_metadata,
'revision': expected_revision
}
assert data == expected_meta
def test_access_to_nonexisting_deposit_returns_404_response(
authenticated_client, deposit_collection, ):
"""Read unknown collection should return a 404 response
"""
unknown_id = 999
try:
Deposit.objects.get(pk=unknown_id)
except Deposit.DoesNotExist:
assert True
for url in private_get_raw_url_endpoints(deposit_collection, unknown_id):
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_404_NOT_FOUND
msg = 'Deposit with id %s does not exist' % unknown_id
assert msg in response.content.decode('utf-8')
diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py
index ffc86cff..a09c30fc 100644
--- a/swh/deposit/tests/api/test_deposit_update.py
+++ b/swh/deposit/tests/api/test_deposit_update.py
@@ -1,383 +1,383 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.urls import reverse
from rest_framework import status
from swh.deposit.models import Deposit, DepositRequest, DepositCollection
from swh.deposit.config import EDIT_SE_IRI, EM_IRI
from swh.deposit.parsers import parse_xml
from swh.deposit.tests.common import create_arborescence_archive, check_archive
def test_replace_archive_to_deposit_is_possible(
tmp_path, partial_deposit, deposit_collection, authenticated_client,
sample_archive, atom_dataset):
"""Replace all archive with another one should return a 204 response
"""
tmp_path = str(tmp_path)
# given
deposit = partial_deposit
requests = DepositRequest.objects.filter(
deposit=deposit,
type='archive')
assert len(list(requests)) == 1
check_archive(sample_archive['name'], requests[0].archive.name)
# we have no metadata for that deposit
requests = list(DepositRequest.objects.filter(
deposit=deposit, type='metadata'))
assert len(requests) == 0
response = authenticated_client.post(
reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'],
HTTP_SLUG=deposit.external_id,
HTTP_IN_PROGRESS=True)
requests = list(DepositRequest.objects.filter(
deposit=deposit, type='metadata'))
assert len(requests) == 1
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id])
external_id = 'some-external-id-1'
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some other content in file')
response = authenticated_client.put(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name'], ))
assert response.status_code == status.HTTP_204_NO_CONTENT
requests = DepositRequest.objects.filter(
deposit=deposit,
type='archive')
assert len(list(requests)) == 1
check_archive(archive2['name'], requests[0].archive.name)
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type='metadata'))
assert len(requests) == 1
def test_replace_metadata_to_deposit_is_possible(
tmp_path, authenticated_client, partial_deposit_with_metadata,
deposit_collection, atom_dataset):
"""Replace all metadata with another one should return a 204 response
"""
# given
deposit = partial_deposit_with_metadata
raw_metadata0 = atom_dataset['entry-data0'] % deposit.external_id.encode(
'utf-8')
requests_meta = DepositRequest.objects.filter(
deposit=deposit,
type='metadata')
assert len(requests_meta) == 1
request_meta0 = requests_meta[0]
- assert request_meta0.raw_metadata == raw_metadata0.decode('utf-8')
+ assert request_meta0.raw_metadata == raw_metadata0
requests_archive0 = DepositRequest.objects.filter(
deposit=deposit, type='archive')
assert len(requests_archive0) == 1
update_uri = reverse(EDIT_SE_IRI, args=[
deposit_collection.name, deposit.id])
response = authenticated_client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_204_NO_CONTENT
requests_meta = DepositRequest.objects.filter(
deposit=deposit,
type='metadata')
assert len(requests_meta) == 1
request_meta1 = requests_meta[0]
raw_metadata1 = request_meta1.raw_metadata
- assert raw_metadata1 == atom_dataset['entry-data1'].decode('utf-8')
+ assert raw_metadata1 == atom_dataset['entry-data1']
assert raw_metadata0 != raw_metadata1
assert request_meta0 != request_meta1
# check we did not touch the other parts
requests_archive1 = DepositRequest.objects.filter(
deposit=deposit, type='archive')
assert len(requests_archive1) == 1
assert set(requests_archive0) == set(requests_archive1)
def test_add_archive_to_deposit_is_possible(
tmp_path, authenticated_client, deposit_collection,
partial_deposit_with_metadata, sample_archive):
"""Add another archive to a deposit return a 201 response
"""
tmp_path = str(tmp_path)
deposit = partial_deposit_with_metadata
requests = DepositRequest.objects.filter(
deposit=deposit,
type='archive')
assert len(requests) == 1
check_archive(sample_archive['name'], requests[0].archive.name)
requests_meta0 = DepositRequest.objects.filter(
deposit=deposit, type='metadata')
assert len(requests_meta0) == 1
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id])
external_id = 'some-external-id-1'
archive2 = create_arborescence_archive(
tmp_path, 'archive2', 'file2', b'some other content in file')
response = authenticated_client.post(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name'],))
assert response.status_code == status.HTTP_201_CREATED
requests = DepositRequest.objects.filter(
deposit=deposit,
type='archive').order_by('id')
assert len(requests) == 2
# first archive still exists
check_archive(sample_archive['name'], requests[0].archive.name)
# a new one was added
check_archive(archive2['name'], requests[1].archive.name)
# check we did not touch the other parts
requests_meta1 = DepositRequest.objects.filter(
deposit=deposit, type='metadata')
assert len(requests_meta1) == 1
assert set(requests_meta0) == set(requests_meta1)
def test_add_metadata_to_deposit_is_possible(
authenticated_client, deposit_collection,
partial_deposit_with_metadata, atom_dataset):
"""Add metadata with another one should return a 204 response
"""
deposit = partial_deposit_with_metadata
requests = DepositRequest.objects.filter(
deposit=deposit,
type='metadata')
assert len(requests) == 1
requests_archive0 = DepositRequest.objects.filter(
deposit=deposit, type='archive')
assert len(requests_archive0) == 1
update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name,
deposit.id])
atom_entry = atom_dataset['entry-data1']
response = authenticated_client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=atom_entry)
assert response.status_code == status.HTTP_201_CREATED
requests = DepositRequest.objects.filter(
deposit=deposit,
type='metadata').order_by('id')
assert len(requests) == 2
expected_raw_meta0 = atom_dataset['entry-data0'] % (
deposit.external_id.encode('utf-8'))
# a new one was added
- assert requests[0].raw_metadata == expected_raw_meta0.decode('utf-8')
- assert requests[1].raw_metadata == atom_entry.decode('utf-8')
+ assert requests[0].raw_metadata == expected_raw_meta0
+ assert requests[1].raw_metadata == atom_entry
# check we did not touch the other parts
requests_archive1 = DepositRequest.objects.filter(
deposit=deposit, type='archive')
assert len(requests_archive1) == 1
assert set(requests_archive0) == set(requests_archive1)
def test_add_metadata_to_unknown_deposit(
deposit_collection, authenticated_client, atom_dataset):
"""Replacing metadata to unknown deposit should return a 404 response
"""
unknown_deposit_id = 1000
try:
Deposit.objects.get(pk=unknown_deposit_id)
except Deposit.DoesNotExist:
assert True
url = reverse(EDIT_SE_IRI, args=[deposit_collection, unknown_deposit_id])
response = authenticated_client.post(
url,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_404_NOT_FOUND
response_content = parse_xml(response.content)
assert 'Unknown collection name' in \
response_content['sword:error']['summary']
def test_add_metadata_to_unknown_collection(
partial_deposit, authenticated_client, atom_dataset):
"""Replacing metadata to unknown deposit should return a 404 response
"""
deposit = partial_deposit
unknown_collection_name = 'unknown-collection'
try:
DepositCollection.objects.get(name=unknown_collection_name)
except DepositCollection.DoesNotExist:
assert True
url = reverse(EDIT_SE_IRI, args=[unknown_collection_name, deposit.id])
response = authenticated_client.post(
url,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_404_NOT_FOUND
response_content = parse_xml(response.content)
assert 'Unknown collection name' in \
response_content['sword:error']['summary']
def test_replace_metadata_to_unknown_deposit(
authenticated_client, deposit_collection, atom_dataset):
"""Adding metadata to unknown deposit should return a 404 response
"""
unknown_deposit_id = 998
try:
Deposit.objects.get(pk=unknown_deposit_id)
except Deposit.DoesNotExist:
assert True
url = reverse(EDIT_SE_IRI, args=[
deposit_collection.name, unknown_deposit_id])
response = authenticated_client.put(
url,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_404_NOT_FOUND
response_content = parse_xml(response.content)
assert 'Deposit with id %s does not exist' % unknown_deposit_id == \
response_content['sword:error']['summary']
def test_add_archive_to_unknown_deposit(
authenticated_client, deposit_collection, atom_dataset):
"""Adding metadata to unknown deposit should return a 404 response
"""
unknown_deposit_id = 997
try:
Deposit.objects.get(pk=unknown_deposit_id)
except Deposit.DoesNotExist:
assert True
url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id])
response = authenticated_client.post(url,
content_type='application/zip',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_404_NOT_FOUND
response_content = parse_xml(response.content)
assert 'Deposit with id %s does not exist' % unknown_deposit_id == \
response_content['sword:error']['summary']
def test_replace_archive_to_unknown_deposit(
authenticated_client, deposit_collection, atom_dataset):
"""Replacing archive to unknown deposit should return a 404 response
"""
unknown_deposit_id = 996
try:
Deposit.objects.get(pk=unknown_deposit_id)
except Deposit.DoesNotExist:
assert True
url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id])
response = authenticated_client.put(
url,
content_type='application/zip',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_404_NOT_FOUND
response_content = parse_xml(response.content)
assert 'Deposit with id %s does not exist' % unknown_deposit_id == \
response_content['sword:error']['summary']
def test_post_metadata_to_em_iri_failure(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""Update (POST) archive with wrong content type should return 400
"""
deposit = partial_deposit
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id])
response = authenticated_client.post(
update_uri,
content_type='application/x-gtar-compressed',
data=atom_dataset['entry-data1'])
assert response.status_code == status.HTTP_400_BAD_REQUEST
response_content = parse_xml(response.content)
msg = 'Packaging format supported is restricted to ' + \
'application/zip, application/x-tar'
assert msg == response_content['sword:error']['summary']
def test_put_metadata_to_em_iri_failure(
authenticated_client, deposit_collection, partial_deposit,
atom_dataset):
"""Update (PUT) archive with wrong content type should return 400
"""
# given
deposit = partial_deposit
# when
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id])
response = authenticated_client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'])
# then
assert response.status_code == status.HTTP_400_BAD_REQUEST
response_content = parse_xml(response.content)
msg = 'Packaging format supported is restricted to ' + \
'application/zip, application/x-tar'
assert msg == response_content['sword:error']['summary']
diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py
index 6346896d..c410c96a 100644
--- a/swh/deposit/tests/conftest.py
+++ b/swh/deposit/tests/conftest.py
@@ -1,390 +1,390 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import base64
import pytest
import psycopg2
from django.urls import reverse
from django.test.utils import setup_databases # type: ignore
# mypy is asked to ignore the import statement above because setup_databases
# is not part of the d.t.utils.__all__ variable.
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from rest_framework import status
from rest_framework.test import APIClient
from typing import Mapping
from swh.scheduler import get_scheduler
from swh.scheduler.tests.conftest import * # noqa
from swh.deposit.config import setup_django_for
from swh.deposit.parsers import parse_xml
from swh.deposit.config import SWHDefaultConfig
from swh.deposit.config import (
COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED,
DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS,
DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_LOAD_FAILURE
)
from swh.deposit.tests.common import create_arborescence_archive
TEST_USER = {
'username': 'test',
'password': 'password',
'email': 'test@example.org',
'provider_url': 'https://hal-test.archives-ouvertes.fr/',
'domain': 'archives-ouvertes.fr/',
'collection': {
'name': 'test'
},
}
TEST_CONFIG = {
'max_upload_size': 500,
'extraction_dir': '/tmp/swh-deposit/test/extraction-dir',
'checks': False,
'provider': {
'provider_name': '',
'provider_type': 'deposit_client',
'provider_url': '',
'metadata': {
}
},
'tool': {
'name': 'swh-deposit',
'version': '0.0.1',
'configuration': {
'sword_version': '2'
}
},
}
def pytest_configure():
setup_django_for('testing')
@pytest.fixture()
def deposit_config():
return TEST_CONFIG
@pytest.fixture(autouse=True)
def deposit_autoconfig(monkeypatch, deposit_config, swh_scheduler_config):
"""Enforce config for deposit classes inherited from SWHDefaultConfig."""
def mock_parse_config(*args, **kw):
config = deposit_config.copy()
config['scheduler'] = {
'cls': 'local',
'args': swh_scheduler_config,
}
return config
monkeypatch.setattr(
SWHDefaultConfig, "parse_config_file",
mock_parse_config)
scheduler = get_scheduler('local', swh_scheduler_config)
task_type = {
'type': 'load-deposit',
'backend_name': 'swh.loader.packages.deposit.tasks.LoadDeposit',
'description': 'why does this have not-null constraint?'}
scheduler.create_task_type(task_type)
@pytest.fixture(scope='session')
def django_db_setup(
request,
django_db_blocker,
postgresql_proc):
from django.conf import settings
settings.DATABASES['default'].update({
('ENGINE', 'django.db.backends.postgresql'),
('NAME', 'tests'),
('USER', postgresql_proc.user), # noqa
('HOST', postgresql_proc.host), # noqa
('PORT', postgresql_proc.port), # noqa
})
with django_db_blocker.unblock():
setup_databases(
verbosity=request.config.option.verbose,
interactive=False,
keepdb=False)
def execute_sql(sql):
"""Execute sql to postgres db"""
with psycopg2.connect(database='postgres') as conn:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute(sql)
@pytest.fixture(autouse=True, scope='session')
def swh_proxy():
"""Automatically inject this fixture in all tests to ensure no outside
connection takes place.
"""
os.environ['http_proxy'] = 'http://localhost:999'
os.environ['https_proxy'] = 'http://localhost:999'
def create_deposit_collection(collection_name: str):
"""Create a deposit collection with name collection_name
"""
from swh.deposit.models import DepositCollection
try:
collection = DepositCollection._default_manager.get(
name=collection_name)
except DepositCollection.DoesNotExist:
collection = DepositCollection(name=collection_name)
collection.save()
return collection
def deposit_collection_factory(
collection_name=TEST_USER['collection']['name']):
@pytest.fixture
def _deposit_collection(db, collection_name=collection_name):
return create_deposit_collection(collection_name)
return _deposit_collection
deposit_collection = deposit_collection_factory()
deposit_another_collection = deposit_collection_factory('another-collection')
@pytest.fixture
def deposit_user(db, deposit_collection):
"""Create/Return the test_user "test"
"""
from swh.deposit.models import DepositClient
try:
user = DepositClient._default_manager.get(
username=TEST_USER['username'])
except DepositClient.DoesNotExist:
user = DepositClient._default_manager.create_user(
username=TEST_USER['username'],
email=TEST_USER['email'],
password=TEST_USER['password'],
provider_url=TEST_USER['provider_url'],
domain=TEST_USER['domain'],
)
user.collections = [deposit_collection.id]
user.save()
return user
@pytest.fixture
def client():
"""Override pytest-django one which does not work for djangorestframework.
"""
return APIClient() # <- drf's client
@pytest.yield_fixture
def authenticated_client(client, deposit_user):
"""Returned a logged client
"""
_token = '%s:%s' % (deposit_user.username, TEST_USER['password'])
token = base64.b64encode(_token.encode('utf-8'))
authorization = 'Basic %s' % token.decode('utf-8')
client.credentials(HTTP_AUTHORIZATION=authorization)
yield client
client.logout()
@pytest.fixture
def sample_archive(tmp_path):
"""Returns a sample archive
"""
tmp_path = str(tmp_path) # pytest version limitation in previous version
archive = create_arborescence_archive(
tmp_path, 'archive1', 'file1', b'some content in file')
return archive
@pytest.fixture
-def atom_dataset(datadir) -> Mapping[str, bytes]:
+def atom_dataset(datadir) -> Mapping[str, str]:
"""Compute the paths to atom files.
Returns:
Dict of atom name per content (bytes)
"""
atom_path = os.path.join(datadir, 'atom')
data = {}
for filename in os.listdir(atom_path):
filepath = os.path.join(atom_path, filename)
with open(filepath, 'rb') as f:
- raw_content = f.read()
+ raw_content = f.read().decode('utf-8')
# Keep the filename without extension
atom_name = filename.split('.')[0]
data[atom_name] = raw_content
return data
def create_deposit(
authenticated_client, collection_name: str, sample_archive,
external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED):
"""Create a skeleton shell deposit
"""
url = reverse(COL_IRI, args=[collection_name])
# when
response = authenticated_client.post(
url,
content_type='application/zip', # as zip
data=sample_archive['data'],
# + headers
CONTENT_LENGTH=sample_archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=sample_archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
sample_archive['name']))
# then
assert response.status_code == status.HTTP_201_CREATED
from swh.deposit.models import Deposit
deposit = Deposit._default_manager.get(external_id=external_id)
if deposit.status != deposit_status:
deposit.status = deposit_status
deposit.save()
assert deposit.status == deposit_status
return deposit
def create_binary_deposit(
authenticated_client, collection_name: str, sample_archive,
external_id: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED,
atom_dataset: Mapping[str, bytes] = {}):
"""Create a deposit with both metadata and archive set. Then alters its status
to `deposit_status`.
"""
deposit = create_deposit(
authenticated_client, collection_name, sample_archive,
external_id=external_id, deposit_status=DEPOSIT_STATUS_PARTIAL)
response = authenticated_client.post(
reverse(EDIT_SE_IRI, args=[collection_name, deposit.id]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'),
HTTP_SLUG=deposit.external_id,
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
assert deposit.status == DEPOSIT_STATUS_PARTIAL
from swh.deposit.models import Deposit
deposit = Deposit._default_manager.get(pk=deposit.id)
if deposit.status != deposit_status:
deposit.status = deposit_status
deposit.save()
assert deposit.status == deposit_status
return deposit
def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED):
"""Build deposit with a specific status
"""
@pytest.fixture()
def _deposit(sample_archive, deposit_collection, authenticated_client,
deposit_status=deposit_status):
external_id = 'external-id-%s' % deposit_status
return create_deposit(
authenticated_client, deposit_collection.name, sample_archive,
external_id=external_id, deposit_status=deposit_status
)
return _deposit
deposited_deposit = deposit_factory()
rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED)
partial_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_PARTIAL)
verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED)
completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS)
failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE)
@pytest.fixture
def partial_deposit_with_metadata(
sample_archive, deposit_collection, authenticated_client,
atom_dataset):
"""Returns deposit with archive and metadata provided, status 'partial'
"""
return create_binary_deposit(
authenticated_client, deposit_collection.name, sample_archive,
external_id='external-id-partial',
deposit_status=DEPOSIT_STATUS_PARTIAL,
atom_dataset=atom_dataset
)
@pytest.fixture
def partial_deposit_only_metadata(
deposit_collection, authenticated_client,
atom_dataset):
response = authenticated_client.post(
reverse(COL_IRI, args=[deposit_collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_dataset['entry-data1'],
HTTP_SLUG='external-id-partial',
HTTP_IN_PROGRESS=True)
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(response.content)
deposit_id = response_content['deposit_id']
from swh.deposit.models import Deposit
deposit = Deposit._default_manager.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_PARTIAL
return deposit
@pytest.fixture
def complete_deposit(sample_archive, deposit_collection, authenticated_client):
"""Returns a completed deposit (load success)
"""
deposit = create_deposit(
authenticated_client, deposit_collection.name, sample_archive,
external_id='external-id-complete',
deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS
)
_swh_id_context = 'https://hal.archives-ouvertes.fr/hal-01727745'
deposit.swh_id = 'swh:1:dir:42a13fc721c8716ff695d0d62fc851d641f3a12b'
deposit.swh_id_context = '%s;%s' % (
deposit.swh_id, _swh_id_context)
deposit.swh_anchor_id = \
'swh:rev:1:548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10'
deposit.swh_anchor_id_context = '%s;%s' % (
deposit.swh_anchor_id, _swh_id_context)
deposit.save()
return deposit
@pytest.fixture()
def tmp_path(tmp_path):
return str(tmp_path) # issue with oldstable's pytest version
diff --git a/tox.ini b/tox.ini
index aa8038a0..e81b324f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,35 +1,37 @@
[tox]
-envlist=flake8,mypy,py3
+envlist=flake8,mypy,py3-django{1,2}
[testenv]
extras =
testing
deps =
# the dependency below is needed for now as a workaround for
# https://github.com/pypa/pip/issues/6239
swh.core[http] >= 0.0.75
dev: ipdb
pytest-cov
+ django1: Django>=1.11,<2
+ django2: Django>=2,<3
commands =
pytest \
!dev: --cov {envsitepackagesdir}/swh/deposit --cov-branch \
{envsitepackagesdir}/swh/deposit \
{posargs}
[testenv:flake8]
skip_install = true
deps =
flake8
commands =
{envpython} -m flake8 \
--exclude=.tox,.git,__pycache__,.tox,.eggs,*.egg,swh/deposit/migrations
[testenv:mypy]
setenv = DJANGO_SETTINGS_MODULE=swh.deposit.settings.testing
extras =
testing
deps =
mypy
django-stubs
commands =
mypy swh