diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py index 00b4179b..126c0bc5 100644 --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -1,562 +1,575 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse import pytest from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import check_archive, create_arborescence_archive def test_post_deposit_binary_no_slug( authenticated_client, deposit_collection, sample_archive ): """Posting a binary deposit without slug header should return 400 """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert b"Missing SLUG header" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_binary_support( authenticated_client, deposit_collection, sample_archive ): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/octet-stream", data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_ok( authenticated_client, deposit_collection, sample_archive ): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"],), ) # then response_content = parse_xml(BytesIO(response.content)) assert response.status_code == status.HTTP_201_CREATED deposit_id = response_content["deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None response_content = parse_xml(BytesIO(response.content)) assert response_content["deposit_archive"] == sample_archive["name"] assert int(response_content["deposit_id"]) == deposit.id assert response_content["deposit_status"] == deposit.status edit_se_iri = reverse("edit_se_iri", args=[deposit_collection.name, deposit.id]) assert response._headers["location"] == ( "Location", "http://testserver" + edit_se_iri, ) def test_post_deposit_binary_failure_unsupported_packaging_header( authenticated_client, deposit_collection, sample_archive ): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when response = authenticated_client.post( url, content_type="application/zip", data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="something-unsupported", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST + assert ( + b"The packaging provided something-unsupported is not supported" + in response.content + ) + with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_no_content_disposition_header( authenticated_client, deposit_collection, sample_archive ): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when response = authenticated_client.post( url, content_type="application/zip", data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST + assert b"CONTENT_DISPOSITION header is mandatory" in response.content + with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_mediation_not_supported( authenticated_client, deposit_collection, sample_archive ): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/zip", data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_ON_BEHALF_OF="someone", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_412_PRECONDITION_FAILED with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Binary upload must not exceed the limit set up... """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file", up_to_size=500 ) external_id = "some-external-id" # when response = authenticated_client.post( url, content_type="application/zip", data=archive["data"], # + headers CONTENT_LENGTH=archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b"Upload size limit exceeded" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_2_post_2_different_deposits( authenticated_client, deposit_collection, sample_archive ): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG="some-external-id-1", HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() assert len(deposits) == 1 assert deposits[0] == deposit # second post response = authenticated_client.post( url, content_type="application/x-tar", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG="another-external-id", HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename1", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content["deposit_id"] deposit2 = Deposit.objects.get(pk=deposit_id2) assert deposit != deposit2 deposits = Deposit.objects.all().order_by("id") assert len(deposits) == 2 assert list(deposits), [deposit == deposit2] def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="true", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"],), ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = authenticated_client.post( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"]), ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive("filename0", deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_se_iri = reverse("edit_se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready r = authenticated_client.put( em_iri, content_type="application/zip", data=archive2["data"], CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content # adding file is no longer possible since the deposit's status # is ready r = authenticated_client.post( em_iri, content_type="application/zip", data=archive2["data"], CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content # replacing metadata is no longer possible since the deposit's # status is ready r = authenticated_client.put( edit_se_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST + assert b"You can only act on deposit with status 'partial'" in r.content diff --git a/swh/deposit/tests/api/test_deposit_delete.py b/swh/deposit/tests/api/test_deposit_delete.py index 76959c24..23a83b8c 100644 --- a/swh/deposit/tests/api/test_deposit_delete.py +++ b/swh/deposit/tests/api/test_deposit_delete.py @@ -1,123 +1,131 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from typing import Dict, Mapping from django.urls import reverse from rest_framework import status from swh.deposit.config import ( ARCHIVE_KEY, DEPOSIT_STATUS_DEPOSITED, EDIT_SE_IRI, EM_IRI, METADATA_KEY, ) from swh.deposit.models import Deposit, DepositRequest def count_deposit_request_types(deposit_requests) -> Mapping[str, int]: deposit_request_types = defaultdict(int) # type: Dict[str, int] for dr in deposit_requests: deposit_request_types[dr.type] += 1 return deposit_request_types def test_delete_archive_on_partial_deposit_works( authenticated_client, partial_deposit_with_metadata, deposit_collection ): """Removing partial deposit's archive should return a 204 response """ deposit_id = partial_deposit_with_metadata.id deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) # deposit request type: 'archive', 1 'metadata' deposit_request_types = count_deposit_request_types(deposit_requests) assert deposit_request_types == {ARCHIVE_KEY: 1, METADATA_KEY: 1} # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit_id) deposit_requests2 = DepositRequest.objects.filter(deposit=deposit) deposit_request_types = count_deposit_request_types(deposit_requests2) assert deposit_request_types == {METADATA_KEY: 1} def test_delete_archive_on_undefined_deposit_fails( authenticated_client, deposit_collection, sample_archive ): """Delete undefined deposit returns a 404 response """ # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, 999]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_404_NOT_FOUND def test_delete_non_partial_deposit( authenticated_client, deposit_collection, deposited_deposit ): """Delete !partial status deposit should return a 400 response """ deposit = deposited_deposit assert deposit.status == DEPOSIT_STATUS_DEPOSITED # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_400_BAD_REQUEST + assert ( + b"You can only act on deposit with status 'partial'" in response.content + ) + deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None def test_delete_partial_deposit( authenticated_client, deposit_collection, partial_deposit ): """Delete deposit should return a 204 response """ # given deposit = partial_deposit # when url = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) assert deposit_requests == [] deposits = list(Deposit.objects.filter(pk=deposit.id)) assert deposits == [] def test_delete_on_edit_se_iri_cannot_delete_non_partial_deposit( authenticated_client, deposit_collection, complete_deposit ): """Delete !partial deposit should return a 400 response """ # given deposit = complete_deposit # when url = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_400_BAD_REQUEST + assert ( + b"You can only act on deposit with status 'partial'" in response.content + ) + deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None diff --git a/swh/deposit/tests/api/test_deposit_private_update_status.py b/swh/deposit/tests/api/test_deposit_private_update_status.py index f93801de..fc7e0754 100644 --- a/swh/deposit/tests/api/test_deposit_private_update_status.py +++ b/swh/deposit/tests/api/test_deposit_private_update_status.py @@ -1,191 +1,196 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import json from django.urls import reverse from rest_framework import status from swh.deposit.api.private.deposit_update_status import MANDATORY_KEYS from swh.deposit.config import ( DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, PRIVATE_PUT_DEPOSIT, ) from swh.deposit.models import Deposit from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT, swhid PRIVATE_PUT_DEPOSIT_NC = PRIVATE_PUT_DEPOSIT + "-nc" def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_PUT_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_PUT_DEPOSIT_NC, args=[deposit.id]), ] def test_update_deposit_status_success_with_info( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit with load success should require all information to succeed """ deposit = ready_deposit_verified expected_status = DEPOSIT_STATUS_LOAD_SUCCESS origin_url = "something" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "47dc6b4636c7f6cba0df83e3d5490bf4334d987e" snapshot_id = "68c0d26104d47e278dd6be07ed61fafb561d0d20" full_body_info = { "status": DEPOSIT_STATUS_LOAD_SUCCESS, "revision_id": revision_id, "directory_id": directory_id, "snapshot_id": snapshot_id, "origin_url": origin_url, } for url in private_check_url_endpoints(deposit_collection, deposit): dir_id = swhid(DIRECTORY, directory_id) rev_id = swhid(REVISION, revision_id) snp_id = swhid(SNAPSHOT, snapshot_id) expected_swh_id = "swh:1:dir:%s" % directory_id expected_swh_id_context = ( f"{dir_id};origin={origin_url};" + f"visit={snp_id};anchor={rev_id};path=/" ) response = authenticated_client.put( url, content_type="application/json", data=json.dumps(full_body_info), ) assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == expected_status assert deposit.swh_id == expected_swh_id assert deposit.swh_id_context == expected_swh_id_context # Reset deposit deposit = ready_deposit_verified deposit.save() def test_update_deposit_status_rejected_with_info( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit with rejected status needs few information to succeed """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": DEPOSIT_STATUS_LOAD_FAILURE}), ) assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE assert deposit.swh_id is None assert deposit.swh_id_context is None # Reset status deposit = ready_deposit_verified deposit.save() def test_update_deposit_status_success_with_incomplete_data( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit status with status success and incomplete information should fail """ deposit = ready_deposit_verified origin_url = "something" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "47dc6b4636c7f6cba0df83e3d5490bf4334d987e" snapshot_id = "68c0d26104d47e278dd6be07ed61fafb561d0d20" new_status = DEPOSIT_STATUS_LOAD_SUCCESS full_body_info = { "status": new_status, "revision_id": revision_id, "directory_id": directory_id, "snapshot_id": snapshot_id, "origin_url": origin_url, } for url in private_check_url_endpoints(deposit_collection, deposit): for key in MANDATORY_KEYS: # Crafting body with missing information so that it raises body = copy.deepcopy(full_body_info) body.pop(key) # make the body incomplete response = authenticated_client.put( url, content_type="application/json", data=json.dumps(body), ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( f"deposit status to {new_status} requires information {key}" in response.content.decode("utf-8") ) def test_update_deposit_status_will_fail_with_unknown_status( authenticated_client, deposit_collection, ready_deposit_verified ): """Unknown status for update should return a 400 response """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": "unknown"}) ) assert response.status_code == status.HTTP_400_BAD_REQUEST + assert b"Possible status in " in response.content def test_update_deposit_status_will_fail_with_no_status_key( authenticated_client, deposit_collection, ready_deposit_verified ): """No status provided for update should return a 400 response """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"something": "something"}), ) assert response.status_code == status.HTTP_400_BAD_REQUEST + assert b"The status key is mandatory with possible values" in response.content def test_update_deposit_status_success_without_swh_id_fail( authenticated_client, deposit_collection, ready_deposit_verified ): """Providing successful status without swh_id should return a 400 """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": DEPOSIT_STATUS_LOAD_SUCCESS}), ) assert response.status_code == status.HTTP_400_BAD_REQUEST + assert ( + b"Updating deposit status to done requires information" in response.content + ) diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 45037d51..ef4ca1b0 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,810 +1,805 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import attr from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status +from swh.deposit.api.common import ACCEPT_ARCHIVE_CONTENT_TYPES from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, EDIT_SE_IRI, EM_IRI, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import check_archive, create_arborescence_archive from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import parse_swhid, swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = authenticated_client.post( reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = authenticated_client.put( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_replace_metadata_to_deposit_is_possible( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, ): """Replace all metadata with another one should return a 204 response """ # given deposit = partial_deposit_with_metadata raw_metadata0 = atom_dataset["entry-data0"] % deposit.external_id.encode("utf-8") requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_204_NO_CONTENT requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = authenticated_client.post( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_add_metadata_to_deposit_is_possible( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Add metadata with another one should return a 204 response """ deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) atom_entry = atom_dataset["entry-data1"] response = authenticated_client.post( update_uri, content_type="application/atom+xml;type=entry", data=atom_entry ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 2 expected_raw_meta0 = atom_dataset["entry-data0"] % ( deposit.external_id.encode("utf-8") ) # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_both_archive_and_metadata_to_deposit( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, sample_archive, ): """Scenario: Add both a new archive and new metadata to a partial deposit is ok Response: 201 """ deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 1 + 1, "New deposit request archive got added" expected_raw_meta0 = atom_dataset["entry-data0"] % ( deposit.external_id.encode("utf-8") ) # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == data_atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 + 1, "New deposit request metadata got added" def test_post_metadata_empty_post_finalize_deposit_ok( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Empty atom post entry with header in-progress to false transitions deposit to 'deposited' status Response: 200 """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/atom+xml;type=entry", data="", size=0, HTTP_IN_PROGRESS=False, ) assert response.status_code == status.HTTP_200_OK deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_add_metadata_to_unknown_deposit( deposit_collection, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 1000 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EDIT_SE_IRI, args=[deposit_collection, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert "Unknown collection name" in response_content["sword:error"]["summary"] def test_add_metadata_to_unknown_collection( partial_deposit, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ deposit = partial_deposit unknown_collection_name = "unknown-collection" try: DepositCollection.objects.get(name=unknown_collection_name) except DepositCollection.DoesNotExist: assert True url = reverse(EDIT_SE_IRI, args=[unknown_collection_name, deposit.id]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert "Unknown collection name" in response_content["sword:error"]["summary"] def test_replace_metadata_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 998 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EDIT_SE_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit with id %s does not exist" % unknown_deposit_id == response_content["sword:error"]["summary"] ) def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit with id %s does not exist" % unknown_deposit_id == response_content["sword:error"]["summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit with id %s does not exist" % unknown_deposit_id == response_content["sword:error"]["summary"] ) def test_post_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (POST) archive with wrong content type should return 400 """ deposit = partial_deposit update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/x-gtar-compressed", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST - response_content = parse_xml(response.content) - msg = ( - "Packaging format supported is restricted to " - + "application/zip, application/x-tar" - ) - assert msg == response_content["sword:error"]["summary"] + assert b"Packaging format supported is restricted" in response.content + for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: + assert supported_format.encode() in response.content def test_put_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (PUT) archive with wrong content type should return 400 """ # given deposit = partial_deposit # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST - response_content = parse_xml(response.content) - msg = ( - "Packaging format supported is restricted to " - + "application/zip, application/x-tar" - ) - assert msg == response_content["sword:error"]["summary"] + assert b"Packaging format supported is restricted" in response.content + for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: + assert supported_format.encode() in response.content def test_put_update_metadata_and_archive_deposit_partial_nominal( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, sample_archive, ): """Scenario: Replace metadata and archive(s) with new ones should be ok Response: 204 """ # given deposit = partial_deposit_with_metadata raw_metadata0 = atom_dataset["entry-data0"] % deposit.external_id.encode("utf-8") requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_204_NO_CONTENT # check we updated the metadata part requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == data_atom_entry assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # and the archive part requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) != set(requests_archive1) def test_put_update_metadata_done_deposit_nominal( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, sample_data, swh_storage, ): """Nominal scenario, client send an update of metadata on a deposit with status "done" with an existing swhid. Such swhid has its metadata updated accordingly both in the deposit backend and in the metadata storage. Response: 204 """ deposit_swhid = parse_swhid(complete_deposit.swh_id) assert deposit_swhid.object_type == "directory" directory_id = hash_to_bytes(deposit_swhid.object_id) # directory targeted by the complete_deposit does not exist in the storage assert list(swh_storage.directory_missing([directory_id])) == [directory_id] # so let's create a directory reference in the storage (current deposit targets an # unknown swhid) existing_directory = sample_data.directory swh_storage.directory_add([existing_directory]) assert list(swh_storage.directory_missing([existing_directory.id])) == [] # and patch one complete deposit swh_id so it targets said reference complete_deposit.swh_id = swhid("directory", existing_directory.id) complete_deposit.save() actual_existing_requests_archive = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) nb_archives = len(actual_existing_requests_archive) actual_existing_requests_metadata = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) nb_metadata = len(actual_existing_requests_metadata) update_uri = reverse( EDIT_SE_IRI, args=[deposit_collection.name, complete_deposit.id] ) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=complete_deposit.swh_id, ) assert response.status_code == status.HTTP_204_NO_CONTENT new_requests_meta = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) assert len(new_requests_meta) == nb_metadata + 1 request_meta1 = new_requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) assert len(requests_archive1) == nb_archives assert set(actual_existing_requests_archive) == set(requests_archive1) # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url, metadata={"name": complete_deposit.client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher directory_swhid = parse_swhid(complete_deposit.swh_id) page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.DIRECTORY, directory_swhid, metadata_authority ) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.DIRECTORY, id=directory_swhid, discovery_date=request_meta1.date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=raw_metadata1.encode(), origin=complete_deposit.origin_url, ) ], next_page_token=None, ) def test_put_update_metadata_done_deposit_failure_mismatched_swhid( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit with SWHID not matching the deposit's. Response: 400 """ incorrect_swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" assert complete_deposit.swh_id != incorrect_swhid update_uri = reverse( EDIT_SE_IRI, args=[deposit_collection.name, complete_deposit.id] ) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=incorrect_swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Mismatched provided SWHID" in response.content def test_put_update_metadata_done_deposit_failure_malformed_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with a malformed xml Response: 400 """ update_uri = reverse( EDIT_SE_IRI, args=[deposit_collection.name, complete_deposit.id] ) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-ko"], HTTP_X_CHECK_SWHID=complete_deposit.swh_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_put_update_metadata_done_deposit_failure_empty_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with an empty xml. Response: 400 """ update_uri = reverse( EDIT_SE_IRI, args=[deposit_collection.name, complete_deposit.id] ) for atom_key in ["entry-data-empty-body", "entry-data-empty-body-no-namespace"]: atom_content = atom_dataset[atom_key] response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_content, HTTP_X_CHECK_SWHID=complete_deposit.swh_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_put_update_metadata_done_deposit_failure_functional_checks( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done without required incomplete metadata Response: 400 """ update_uri = reverse( EDIT_SE_IRI, args=[deposit_collection.name, complete_deposit.id] ) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", # no title, nor author, nor name fields data=atom_dataset["entry-data-fail-metadata-functional-checks"], HTTP_X_CHECK_SWHID=complete_deposit.swh_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content # detail on the errors assert b"- Mandatory fields are missing (author)" in response.content assert ( b"- Mandatory alternate fields are missing (name or title)" in response.content )