diff --git a/swh/deposit/tests/api/test_collection_post_multipart.py b/swh/deposit/tests/api/test_collection_post_multipart.py index 7aacbfda..e06f1171 100644 --- a/swh/deposit/tests/api/test_collection_post_multipart.py +++ b/swh/deposit/tests/api/test_collection_post_multipart.py @@ -1,460 +1,352 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests handling of multipart requests to POST Col-IRI.""" from io import BytesIO import uuid from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse import pytest from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import check_archive +from swh.deposit.tests.common import check_archive, post_multipart def test_post_deposit_multipart_without_slug_header( - authenticated_client, deposit_collection, atom_dataset, mocker, deposit_user + authenticated_client, + deposit_collection, + atom_dataset, + mocker, + deposit_user, + sample_archive, ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) - + data_atom_entry = atom_dataset["entry-data-deposit-binary"] id_ = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=id_) - archive_content = b"some content representing archive" - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name="archive0", - name="archive0", - content_type="application/zip", - size=len(archive_content), - charset=None, - ) - - data_atom_entry = atom_dataset["entry-data-deposit-binary"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + sample_archive, + data_atom_entry, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + id_ assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_multipart_zip( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """one multipart deposit (zip+xml) should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) - - archive = InMemoryUploadedFile( - BytesIO(sample_archive["data"]), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/zip", - size=sample_archive["length"], - charset=None, - ) - data_atom_entry = atom_dataset["entry-data-deposit-binary"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - external_id = "external-id" # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + sample_archive, + data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry def test_post_deposit_multipart_tar( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """one multipart deposit (tar+xml) should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) - - # from django.core.files import uploadedfile data_atom_entry = atom_dataset["entry-data-deposit-binary"] - - archive = InMemoryUploadedFile( - BytesIO(sample_archive["data"]), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/x-tar", - size=sample_archive["length"], - charset=None, - ) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - external_id = "external-id" # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + sample_archive, + data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry def test_post_deposit_multipart_put_to_replace_metadata( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """One multipart deposit followed by a metadata update should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) - data_atom_entry = atom_dataset["entry-data-deposit-binary"] - - archive = InMemoryUploadedFile( - BytesIO(sample_archive["data"]), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/zip", - size=sample_archive["length"], - charset=None, - ) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - external_id = "external-id" # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + sample_archive, + data_atom_entry, HTTP_IN_PROGRESS="true", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry replace_metadata_uri = response._headers["location"][1] response = authenticated_client.put( replace_metadata_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT # deposit_id did not change deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert ( deposit_request.raw_metadata == atom_dataset["entry-data-deposit-binary"] ) # FAILURE scenarios def test_post_deposit_multipart_only_archive_and_atom_entry( authenticated_client, deposit_collection ): """Multipart deposit only accepts one archive and one atom+xml""" # given url = reverse(COL_IRI, args=[deposit_collection.name]) archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/x-tar", size=len(archive_content), charset=None, ) other_archive_content = b"some-other-content" other_archive = InMemoryUploadedFile( BytesIO(other_archive_content), field_name="atom0", name="atom0", content_type="application/x-tar", size=len(other_archive_content), charset="utf-8", ) # when response = authenticated_client.post( url, format="multipart", data={"archive": archive, "atom_entry": other_archive,}, # + headers HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE assert ( "Only 1 application/zip (or application/x-tar) archive" in response.content.decode("utf-8") ) # when archive.seek(0) response = authenticated_client.post( url, format="multipart", data={"archive": archive,}, # + headers HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE assert ( "You must provide both 1 application/zip (or " "application/x-tar) and 1 atom+xml entry for " "multipart deposit" in response.content.decode("utf-8") ) is True def test_post_deposit_multipart_400_when_badly_formatted_xml( authenticated_client, deposit_collection, sample_archive, atom_dataset ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) - - archive_content = sample_archive["data"] - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/zip", - size=len(archive_content), - charset=None, - ) - data_atom_entry_ko = atom_dataset["entry-data-ko"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry_ko.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry_ko), - charset="utf-8", - ) # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + sample_archive, + data_atom_entry_ko, HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) assert b"Malformed xml metadata" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_multipart_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, atom_dataset, sample_archive ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) - data = sample_archive["data"] * 8 - archive = InMemoryUploadedFile( - BytesIO(data), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/zip", - size=len(data), - charset=None, - ) - + archive = { + **sample_archive, + "data": sample_archive["data"] * 8, + } data_atom_entry = atom_dataset["entry-data-deposit-binary"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) external_id = "external-id" # when - response = authenticated_client.post( + response = post_multipart( + authenticated_client, url, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, - # + headers + archive, + data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b"Upload size limit exceeded" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 4bb1ff27..ec83a3f8 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,180 +1,140 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on SE-IRI.""" -from io import BytesIO - -from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, EDIT_IRI, SE_IRI, ) from swh.deposit.models import Deposit, DepositRequest -from swh.deposit.tests.common import post_atom +from swh.deposit.tests.common import post_atom, post_multipart, put_multipart def test_add_both_archive_and_metadata_to_deposit( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, sample_archive, deposit_user, ): """Scenario: Add both a new archive and new metadata to a partial deposit is ok Response: 201 """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 - update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) - archive = InMemoryUploadedFile( - BytesIO(sample_archive["data"]), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/x-tar", - size=sample_archive["length"], - charset=None, - ) - data_atom_entry = atom_dataset["entry-data1"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - - update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) - response = authenticated_client.post( - update_uri, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, + response = post_multipart( + authenticated_client, + reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), + sample_archive, + data_atom_entry, ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 1 + 1, "New deposit request archive got added" expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == data_atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 + 1, "New deposit request metadata got added" def test_post_metadata_empty_post_finalize_deposit_ok( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Empty atom post entry with header in-progress to false transitions deposit to 'deposited' status Response: 200 """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) response = post_atom( authenticated_client, update_uri, data="", size=0, HTTP_IN_PROGRESS=False, ) assert response.status_code == status.HTTP_200_OK deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_put_update_metadata_and_archive_deposit_partial_nominal( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, sample_archive, deposit_user, ): """Scenario: Replace metadata and archive(s) with new ones should be ok Response: 204 """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 - archive = InMemoryUploadedFile( - BytesIO(sample_archive["data"]), - field_name=sample_archive["name"], - name=sample_archive["name"], - content_type="application/x-tar", - size=sample_archive["length"], - charset=None, - ) - data_atom_entry = atom_dataset["entry-data1"] - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry.encode("utf-8")), - field_name="atom0", - name="atom0", - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset="utf-8", - ) - - update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) - response = authenticated_client.put( - update_uri, - format="multipart", - data={"archive": archive, "atom_entry": atom_entry,}, + response = put_multipart( + authenticated_client, + reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]), + sample_archive, + data_atom_entry, ) assert response.status_code == status.HTTP_204_NO_CONTENT # check we updated the metadata part requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == data_atom_entry assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # and the archive part requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) != set(requests_archive1) diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index f2999c46..2e9008e0 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,177 +1,215 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib +from io import BytesIO import os import re import tarfile import tempfile +from django.core.files.uploadedfile import InMemoryUploadedFile + from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, "rb") as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b"" for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { "dir": os.path.dirname(archive_path), "name": os.path.basename(archive_path), "path": archive_path, "length": length, "sha1sum": sha1sum.hexdigest(), "md5sum": md5sum.hexdigest(), "data": data, } def _compress(path, extension, dir_path): """Compress path according to extension """ if extension == "zip" or extension == "tar": return tarball.compress(path, extension, dir_path) elif "." in extension: split_ext = extension.split(".") if split_ext[0] != "tar": raise ValueError( "Development error, only zip or tar archive supported, " "%s not supported" % extension ) # deal with specific tar mode = split_ext[1] supported_mode = ["xz", "gz", "bz2"] if mode not in supported_mode: raise ValueError( "Development error, only %s supported, %s not supported" % (supported_mode, mode) ) files = tarball._ls(dir_path) with tarfile.open(path, "w:%s" % mode) as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) return path def create_arborescence_archive( root_path, archive_name, filename, content, up_to_size=None, extension="zip" ): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Args: root_path (str): Location path of the archive to create archive_name (str): Archive's name (without extension) filename (str): Archive's content is only one filename content (bytes): Content of the filename up_to_size (int | None): Fill in the blanks size to oversize or complete an archive's size extension (str): Extension of the archive to write (default is zip) Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, "wb") as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b"0" * batch_size) count += batch_size _path = "%s.%s" % (dir_path, extension) _path = _compress(_path, extension, dir_path) return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, "w:gz") as _archive: _archive.add(archive["path"], arcname=archive["name"]) return compute_info(invalid_archive_path) def check_archive(archive_name: str, archive_name_to_check: str): """Helper function to ensure archive_name is present within the archive_name_to_check. Raises: AssertionError if archive_name is not present within archive_name_to_check """ ARCHIVE_FILEPATH_PATTERN = re.compile( r"client_[0-9].*/[0-9]{8}-[0-9]{6}\.[0-9]{6}/[a-zA-Z0-9.].*" ) assert ARCHIVE_FILEPATH_PATTERN.match(archive_name_to_check) if "." in archive_name: filename, extension = archive_name.split(".") pattern = re.compile(".*/%s.*\\.%s" % (filename, extension)) else: pattern = re.compile(".*/%s" % archive_name) assert pattern.match(archive_name_to_check) is not None def _post_or_put_archive(f, url, archive, slug=None, in_progress=None, **kwargs): default_kwargs = dict( content_type="application/zip", CONTENT_LENGTH=archive["length"], HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive["name"],), HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", ) kwargs = {**default_kwargs, **kwargs} return f(url, data=archive["data"], HTTP_CONTENT_MD5=archive["md5sum"], **kwargs,) def post_archive(authenticated_client, *args, **kwargs): return _post_or_put_archive(authenticated_client.post, *args, **kwargs) def put_archive(authenticated_client, *args, **kwargs): return _post_or_put_archive(authenticated_client.put, *args, **kwargs) def post_atom(authenticated_client, url, data, **kwargs): return authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=data, **kwargs ) def put_atom(authenticated_client, url, data, **kwargs): return authenticated_client.put( url, content_type="application/atom+xml;type=entry", data=data, **kwargs ) + + +def _post_or_put_multipart(f, url, archive, atom_entry, **kwargs): + archive = InMemoryUploadedFile( + BytesIO(archive["data"]), + field_name=archive["name"], + name=archive["name"], + content_type="application/x-tar", + size=archive["length"], + charset=None, + ) + + atom_entry = InMemoryUploadedFile( + BytesIO(atom_entry.encode("utf-8")), + field_name="atom0", + name="atom0", + content_type='application/atom+xml; charset="utf-8"', + size=len(atom_entry), + charset="utf-8", + ) + + return f( + url, + format="multipart", + data={"archive": archive, "atom_entry": atom_entry,}, + **kwargs, + ) + + +def post_multipart(authenticated_client, *args, **kwargs): + return _post_or_put_multipart(authenticated_client.post, *args, **kwargs) + + +def put_multipart(authenticated_client, *args, **kwargs): + return _post_or_put_multipart(authenticated_client.put, *args, **kwargs)