diff --git a/swh/deposit/api/checks.py b/swh/deposit/api/checks.py index 05c1e565..ce752cb4 100644 --- a/swh/deposit/api/checks.py +++ b/swh/deposit/api/checks.py @@ -1,63 +1,48 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Functional Metadata checks: Mandatory fields: - 'author' - 'name' or 'title' """ from typing import Dict, Optional, Tuple MANDATORY_FIELDS_MISSING = "Mandatory fields are missing" -ALTERNATE_FIELDS_MISSING = "Mandatory alternate fields are missing" def check_metadata(metadata: Dict) -> Tuple[bool, Optional[Dict]]: """Check metadata for mandatory field presence. Args: metadata: Metadata dictionary to check for mandatory fields Returns: tuple (status, error_detail): True, None if metadata are ok (False, ) otherwise. """ - # following fields are mandatory - required_fields = { - "atom:author": False, - } # at least one value per couple below is mandatory alternate_fields = { - ("atom:name", "atom:title"): False, + ("atom:name", "atom:title", "codemeta:name"): False, + ("atom:author", "codemeta:author"): False, } for field, value in metadata.items(): - for name in required_fields: - if name in field: - required_fields[name] = True - for possible_names in alternate_fields: for possible_name in possible_names: if possible_name in field: alternate_fields[possible_names] = True continue - mandatory_result = [k for k, v in required_fields.items() if not v] - optional_result = [" or ".join(k) for k, v in alternate_fields.items() if not v] + mandatory_result = [" or ".join(k) for k, v in alternate_fields.items() if not v] - if mandatory_result == [] and optional_result == []: + if mandatory_result == []: return True, None - detail = [] - if mandatory_result != []: - detail.append({"summary": MANDATORY_FIELDS_MISSING, "fields": mandatory_result}) - if optional_result != []: - detail.append( - {"summary": ALTERNATE_FIELDS_MISSING, "fields": optional_result,} - ) + detail = [{"summary": MANDATORY_FIELDS_MISSING, "fields": mandatory_result}] return False, {"metadata": detail} diff --git a/swh/deposit/tests/api/test_checks.py b/swh/deposit/tests/api/test_checks.py index dfc71a6a..7aec749b 100644 --- a/swh/deposit/tests/api/test_checks.py +++ b/swh/deposit/tests/api/test_checks.py @@ -1,66 +1,94 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.deposit.api.checks import check_metadata @pytest.mark.parametrize( "metadata_ok", [ { "atom:url": "something", "atom:external_identifier": "something-else", "atom:name": "foo", "atom:author": "someone", }, { "atom:url": "some url", "atom:external_identifier": "some id", "atom:title": "bar", "atom:author": "no one", }, + {"atom:url": "some url", "codemeta:name": "bar", "codemeta:author": "no one",}, ], ) def test_api_checks_check_metadata_ok(metadata_ok, swh_checks_deposit): actual_check, detail = check_metadata(metadata_ok) - assert actual_check is True + assert actual_check is True, detail assert detail is None @pytest.mark.parametrize( "metadata_ko,expected_summary", [ ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:author": "someone", }, { - "summary": "Mandatory alternate fields are missing", - "fields": ["atom:name or atom:title"], + "summary": "Mandatory fields are missing", + "fields": ["atom:name or atom:title or codemeta:name"], }, ), ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:title": "foobar", }, - {"summary": "Mandatory fields are missing", "fields": ["atom:author"],}, + { + "summary": "Mandatory fields are missing", + "fields": ["atom:author or codemeta:author"], + }, + ), + ( + { + "atom:url": "something", + "atom:external_identifier": "something-else", + "codemeta:title": "bar", + "atom:author": "someone", + }, + { + "summary": "Mandatory fields are missing", + "fields": ["atom:name or atom:title or codemeta:name"], + }, + ), + ( + { + "atom:url": "something", + "atom:external_identifier": "something-else", + "atom:title": "foobar", + "author": "foo", + }, + { + "summary": "Mandatory fields are missing", + "fields": ["atom:author or codemeta:author"], + }, ), ], ) def test_api_checks_check_metadata_ko( metadata_ko, expected_summary, swh_checks_deposit ): """Missing optional field should be caught """ actual_check, error_detail = check_metadata(metadata_ko) assert actual_check is False assert error_detail == {"metadata": [expected_summary]} diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py index e47f69b2..b89868c5 100644 --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -1,209 +1,211 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse import pytest from rest_framework import status -from swh.deposit.api.checks import ALTERNATE_FIELDS_MISSING, MANDATORY_FIELDS_MISSING +from swh.deposit.api.checks import MANDATORY_FIELDS_MISSING from swh.deposit.api.private.deposit_check import ( MANDATORY_ARCHIVE_INVALID, MANDATORY_ARCHIVE_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ) from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, create_archive_with_archive, ) PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + "-nc" def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]), ] @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_ok( authenticated_client, deposit_collection, ready_deposit_ok, extension ): """Proper deposit should succeed the checks (-> status ready) """ deposit = ready_deposit_ok for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_invalid_tarball( tmp_path, authenticated_client, deposit_collection, extension ): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ deposit = create_deposit_archive_with_archive( tmp_path, extension, authenticated_client, deposit_collection.name ) for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_INVALID deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED def test_deposit_ko_missing_tarball( authenticated_client, deposit_collection, ready_deposit_only_metadata ): """Deposit without archive should fail the checks: rejected """ deposit = ready_deposit_only_metadata assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_ko_unsupported_tarball( tmp_path, authenticated_client, deposit_collection, ready_deposit_invalid_archive ): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit = ready_deposit_invalid_archive assert DEPOSIT_STATUS_DEPOSITED == deposit.status for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure - assert len(details["metadata"]) == 2 + assert len(details["metadata"]) == 1 mandatory = details["metadata"][0] assert mandatory["summary"] == MANDATORY_FIELDS_MISSING - assert set(mandatory["fields"]) == set(["atom:author"]) - alternate = details["metadata"][1] - assert alternate["summary"] == ALTERNATE_FIELDS_MISSING - assert alternate["fields"] == ["atom:name or atom:title"] + assert set(mandatory["fields"]) == set( + [ + "atom:author or codemeta:author", + "atom:name or atom:title or codemeta:name", + ] + ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_deposit_metadata_ok( authenticated_client, deposit_collection, ready_deposit_ok ): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit = ready_deposit_ok assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def create_deposit_archive_with_archive( root_path, archive_extension, client, collection_name ): # we create the holding archive to a given extension archive = create_arborescence_archive( root_path, "archive1", "file1", b"some content in file", extension=archive_extension, ) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive(root_path, "invalid.tgz", archive) # we deposit it response = client.post( reverse(COL_IRI, args=[collection_name]), content_type="application/x-tar", data=invalid_archive["data"], CONTENT_LENGTH=invalid_archive["length"], HTTP_MD5SUM=invalid_archive["md5sum"], HTTP_SLUG="external-id", HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (invalid_archive["name"],), ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_status = response_content["swh:deposit_status"] assert deposit_status == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert DEPOSIT_STATUS_DEPOSITED == deposit.status return deposit diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 1095f86c..dc9f8251 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,884 +1,885 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import attr from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status from swh.deposit.api.common import ACCEPT_ARCHIVE_CONTENT_TYPES from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, EDIT_IRI, EM_IRI, SE_IRI, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import check_archive, create_arborescence_archive from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import parse_swhid, swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = authenticated_client.post( reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = authenticated_client.put( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_replace_metadata_to_deposit_is_possible( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, deposit_user, ): """Replace all metadata with another one should return a 204 response """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_204_NO_CONTENT requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = authenticated_client.post( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_add_metadata_to_deposit_is_possible( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, deposit_user, ): """Add metadata with another one should return a 204 response """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) atom_entry = atom_dataset["entry-data1"] response = authenticated_client.post( update_uri, content_type="application/atom+xml;type=entry", data=atom_entry ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 2 expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_both_archive_and_metadata_to_deposit( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, sample_archive, deposit_user, ): """Scenario: Add both a new archive and new metadata to a partial deposit is ok Response: 201 """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 1 + 1, "New deposit request archive got added" expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == data_atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 + 1, "New deposit request metadata got added" def test_post_metadata_empty_post_finalize_deposit_ok( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Empty atom post entry with header in-progress to false transitions deposit to 'deposited' status Response: 200 """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/atom+xml;type=entry", data="", size=0, HTTP_IN_PROGRESS=False, ) assert response.status_code == status.HTTP_200_OK deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_add_metadata_to_unknown_deposit( deposit_collection, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 1000 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(SE_IRI, args=[deposit_collection, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit 1000 does not exist" in response_content["sword:error"]["atom:summary"] ) def test_add_metadata_to_unknown_collection( partial_deposit, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ deposit = partial_deposit unknown_collection_name = "unknown-collection" try: DepositCollection.objects.get(name=unknown_collection_name) except DepositCollection.DoesNotExist: assert True url = reverse(SE_IRI, args=[unknown_collection_name, deposit.id]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert "Unknown collection name" in response_content["sword:error"]["atom:summary"] def test_replace_metadata_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 998 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EDIT_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_post_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (POST) archive with wrong content type should return 400 """ deposit = partial_deposit update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/x-gtar-compressed", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (PUT) archive with wrong content type should return 400 """ # given deposit = partial_deposit # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_update_metadata_and_archive_deposit_partial_nominal( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, sample_archive, deposit_user, ): """Scenario: Replace metadata and archive(s) with new ones should be ok Response: 204 """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_204_NO_CONTENT # check we updated the metadata part requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == data_atom_entry assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # and the archive part requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) != set(requests_archive1) def test_put_update_metadata_done_deposit_nominal( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, sample_data, swh_storage, ): """Nominal scenario, client send an update of metadata on a deposit with status "done" with an existing swhid. Such swhid has its metadata updated accordingly both in the deposit backend and in the metadata storage. Response: 204 """ deposit_swhid = parse_swhid(complete_deposit.swhid) assert deposit_swhid.object_type == "directory" directory_id = hash_to_bytes(deposit_swhid.object_id) # directory targeted by the complete_deposit does not exist in the storage assert list(swh_storage.directory_missing([directory_id])) == [directory_id] # so let's create a directory reference in the storage (current deposit targets an # unknown swhid) existing_directory = sample_data.directory swh_storage.directory_add([existing_directory]) assert list(swh_storage.directory_missing([existing_directory.id])) == [] # and patch one complete deposit swhid so it targets said reference complete_deposit.swhid = swhid("directory", existing_directory.id) complete_deposit.save() actual_existing_requests_archive = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) nb_archives = len(actual_existing_requests_archive) actual_existing_requests_metadata = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) nb_metadata = len(actual_existing_requests_metadata) update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_204_NO_CONTENT new_requests_meta = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) assert len(new_requests_meta) == nb_metadata + 1 request_meta1 = new_requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) assert len(requests_archive1) == nb_archives assert set(actual_existing_requests_archive) == set(requests_archive1) # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url, metadata={"name": complete_deposit.client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher directory_swhid = parse_swhid(complete_deposit.swhid) page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.DIRECTORY, directory_swhid, metadata_authority ) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.DIRECTORY, target=directory_swhid, discovery_date=request_meta1.date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=raw_metadata1.encode(), origin=complete_deposit.origin_url, ) ], next_page_token=None, ) def test_put_update_metadata_done_deposit_failure_mismatched_swhid( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit with SWHID not matching the deposit's. Response: 400 """ incorrect_swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" assert complete_deposit.swhid != incorrect_swhid update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=incorrect_swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Mismatched provided SWHID" in response.content def test_put_update_metadata_done_deposit_failure_malformed_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with a malformed xml Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-ko"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_put_update_metadata_done_deposit_failure_empty_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with an empty xml. Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) atom_content = atom_dataset["entry-data-empty-body"] response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", data=atom_content, HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_put_update_metadata_done_deposit_failure_functional_checks( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done without required incomplete metadata Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = authenticated_client.put( update_uri, content_type="application/atom+xml;type=entry", # no title, nor author, nor name fields data=atom_dataset["entry-data-fail-metadata-functional-checks"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content # detail on the errors - assert b"- Mandatory fields are missing (atom:author)" in response.content - assert ( - b"- Mandatory alternate fields are missing (atom:name or atom:title)" - in response.content + msg = ( + b"- Mandatory fields are missing (" + b"atom:name or atom:title or codemeta:name, " + b"atom:author or codemeta:author)" ) + assert msg in response.content def test_put_atom_with_create_origin_and_external_identifier( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ was deprecated before was introduced, clients should get an error when trying to use both """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when response = authenticated_client.put( edit_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["error-with-external-identifier"] % external_id, # + headers HTTP_IN_PROGRESS="false", ) assert b"<external_identifier> is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_put_atom_with_create_origin_and_reference( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ and are mutually exclusive """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) response = authenticated_client.post( url, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when response = authenticated_client.put( edit_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-origin-reference"].format(url=origin_url), # + headers HTTP_IN_PROGRESS="false", ) assert b"only one may be used on a given deposit" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST