diff --git a/swh/deposit/tests/api/test_collection_add_to_origin.py b/swh/deposit/tests/api/test_collection_add_to_origin.py index 516f1984..34ea37a3 100644 --- a/swh/deposit/tests/api/test_collection_add_to_origin.py +++ b/swh/deposit/tests/api/test_collection_add_to_origin.py @@ -1,157 +1,158 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO from django.urls import reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml +from swh.deposit.tests.common import post_atom from ..conftest import create_deposit def test_add_deposit_with_add_to_origin( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit with creates a new deposit with parent """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-add-to-origin"] % origin_url, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit def test_add_deposit_add_to_origin_conflict( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, atom_dataset, sample_archive, deposit_user, deposit_another_user, ): """Posting a deposit with an referencing an origin owned by a different client raises an error """ external_id = "foobar" origin_url = deposit_another_user.provider_url + external_id # create a deposit for that other user, with the same slug create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert b"must start with" in response.content def test_add_deposit_add_to_wrong_origin( authenticated_client, deposit_collection, atom_dataset, sample_archive, ): """Posting a deposit with an referencing an origin not starting with the provider_url raises an error """ origin_url = "http://example.org/foo" # adding a new deposit with the same external id as a completed deposit - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert b"must start with" in response.content def test_add_deposit_with_add_to_origin_and_external_identifier( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit with creates a new deposit with parent """ # given multiple deposit already loaded origin_url = deposit_user.provider_url + completed_deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-both-add-to-origin-and-external-id"] % origin_url, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"<external_identifier> is deprecated." in response.content def test_post_deposit_atom_403_add_to_wrong_origin_url_prefix( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Creating an origin for a prefix not owned by the client is forbidden """ origin_url = "http://example.org/foo" - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-add-to-origin"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_403_FORBIDDEN expected_msg = ( f"Cannot create origin {origin_url}, " f"it must start with {deposit_user.provider_url}" ) assert expected_msg in response.content.decode() diff --git a/swh/deposit/tests/api/test_collection_post_atom.py b/swh/deposit/tests/api/test_collection_post_atom.py index 05d20dbe..dce9e2dc 100644 --- a/swh/deposit/tests/api/test_collection_post_atom.py +++ b/swh/deposit/tests/api/test_collection_post_atom.py @@ -1,637 +1,628 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests the handling of the Atom content when doing a POST Col-IRI.""" from io import BytesIO import uuid import attr from django.urls import reverse import pytest from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_LOAD_SUCCESS, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml +from swh.deposit.tests.common import post_atom from swh.deposit.utils import compute_metadata_context from swh.model.identifiers import SWHID, parse_swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_post_deposit_atom_201_even_with_decimal( authenticated_client, deposit_collection, atom_dataset ): """Posting an initial atom entry should return 201 with deposit receipt """ atom_error_with_decimal = atom_dataset["error-with-decimal"] - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_error_with_decimal, HTTP_SLUG="external-id", HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) dr = DepositRequest.objects.get(deposit=deposit) assert dr.metadata is not None sw_version = dr.metadata.get("codemeta:softwareVersion") assert sw_version == "10.4" def test_post_deposit_atom_400_with_empty_body( authenticated_client, deposit_collection, atom_dataset ): """Posting empty body request should return a 400 response """ atom_content = atom_dataset["entry-data-empty-body"] - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_content, HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_post_deposit_atom_400_badly_formatted_atom( authenticated_client, deposit_collection, atom_dataset ): """Posting a badly formatted atom should return a 400 response """ - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-badly-formatted"], HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_post_deposit_atom_parsing_error( authenticated_client, deposit_collection, atom_dataset ): """Posting parsing error prone atom should return 400 """ - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-parsing-error-prone"], HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_post_deposit_atom_400_both_create_origin_and_add_to_origin( authenticated_client, deposit_collection, atom_dataset ): """Posting a badly formatted atom should return a 400 response """ - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-both-create-origin-and-add-to-origin"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"<swh:create_origin> and <swh:add_to_origin> " b"are mutually exclusive" ) in response.content def test_post_deposit_atom_403_create_wrong_origin_url_prefix( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Creating an origin for a prefix not owned by the client is forbidden """ origin_url = "http://example.org/foo" - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_403_FORBIDDEN expected_msg = ( f"Cannot create origin {origin_url}, " f"it must start with {deposit_user.provider_url}" ) assert expected_msg in response.content.decode() def test_post_deposit_atom_use_slug_header( authenticated_client, deposit_collection, deposit_user, atom_dataset, mocker ): """Posting an atom entry with a slug header but no origin url generates an origin url from the slug """ url = reverse(COL_IRI, args=[deposit_collection.name]) slug = str(uuid.uuid4()) # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, url, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-no-origin-url"], HTTP_IN_PROGRESS="false", HTTP_SLUG=slug, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + slug assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_atom_no_origin_url_nor_slug_header( authenticated_client, deposit_collection, deposit_user, atom_dataset, mocker ): """Posting an atom entry without an origin url or a slug header should generate one """ url = reverse(COL_IRI, args=[deposit_collection.name]) slug = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=slug) # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, url, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-no-origin-url"], - # + headers HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + slug assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_atom_with_mismatched_slug_and_external_identifier( authenticated_client, deposit_collection, atom_dataset ): """Posting an atom entry with mismatched slug header and external_identifier should return a 400 """ external_id = "foobar" url = reverse(COL_IRI, args=[deposit_collection.name]) # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, url, - content_type="application/atom+xml;type=entry", data=atom_dataset["error-with-external-identifier"] % external_id, - # + headers HTTP_IN_PROGRESS="false", HTTP_SLUG="something", ) assert b"The 'external_identifier' tag is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_with_create_origin_and_external_identifier( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ was deprecated before was introduced, clients should get an error when trying to use both """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) document = atom_dataset["error-with-external-identifier-and-create-origin"].format( external_id=external_id, url=origin_url, ) # when - response = authenticated_client.post( - url, - content_type="application/atom+xml;type=entry", - data=document, - # + headers - HTTP_IN_PROGRESS="false", + response = post_atom( + authenticated_client, url, data=document, HTTP_IN_PROGRESS="false", ) assert b"<external_identifier> is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_with_create_origin_and_reference( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ and are mutually exclusive """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) document = atom_dataset["error-with-reference-and-create-origin"].format( external_id=external_id, url=origin_url, ) # when - response = authenticated_client.post( - url, - content_type="application/atom+xml;type=entry", - data=document, - # + headers - HTTP_IN_PROGRESS="false", + response = post_atom( + authenticated_client, url, data=document, HTTP_IN_PROGRESS="false", ) assert b"only one may be used on a given deposit" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_unknown_collection(authenticated_client, atom_dataset): """Posting an atom entry to an unknown collection should return a 404 """ unknown_collection = "unknown-one" with pytest.raises(DepositCollection.DoesNotExist): DepositCollection.objects.get(name=unknown_collection) - response = authenticated_client.post( - reverse(COL_IRI, args=[unknown_collection]), # <- unknown collection - content_type="application/atom+xml;type=entry", + response = post_atom( + authenticated_client, + reverse(COL_IRI, args=[unknown_collection]), data=atom_dataset["entry-data0"], HTTP_SLUG="something", ) assert response.status_code == status.HTTP_404_NOT_FOUND assert b"Unknown collection" in response.content def test_post_deposit_atom_entry_initial( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Posting an initial atom entry should return 201 with deposit receipt """ # given origin_url = deposit_user.provider_url + "1225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(origin_url=origin_url) atom_entry_data = atom_dataset["entry-data0"] % origin_url # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_entry_data, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.metadata is not None assert deposit_request.raw_metadata == atom_entry_data assert bool(deposit_request.archive) is False def test_post_deposit_atom_entry_with_codemeta( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Posting an initial atom entry should return 201 with deposit receipt """ # given origin_url = deposit_user.provider_url + "1225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(origin_url=origin_url) atom_entry_data = atom_dataset["codemeta-sample"] % origin_url # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_entry_data, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.metadata is not None assert deposit_request.raw_metadata == atom_entry_data assert bool(deposit_request.archive) is False def test_deposit_metadata_invalid( authenticated_client, deposit_collection, atom_dataset ): """Posting invalid swhid reference is bad request returned to client """ invalid_swhid = "swh:1:dir :31b5c8cc985d190b5a7ef4878128ebfdc2358f49" xml_data = atom_dataset["entry-data-with-swhid"].format(swhid=invalid_swhid) - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=xml_data, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Invalid SWHID reference" in response.content def test_deposit_metadata_fails_functional_checks( authenticated_client, deposit_collection, atom_dataset ): """Posting functionally invalid metadata swhid is bad request returned to client """ swhid = "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49" invalid_xml_data = atom_dataset[ "entry-data-with-swhid-fail-metadata-functional-checks" ].format(swhid=swhid) - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=invalid_xml_data, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content @pytest.mark.parametrize( "swhid,target_type", [ ( "swh:1:cnt:01b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.CONTENT, ), ( "swh:1:dir:11b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.DIRECTORY, ), ( "swh:1:rev:21b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.REVISION, ), ( "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.RELEASE, ), ( "swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.SNAPSHOT, ), ( "swh:1:cnt:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.CONTENT, ), ( "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;origin=https://inria.halpreprod.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:rev:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa MetadataTargetType.DIRECTORY, ), ( "swh:1:rev:71b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.REVISION, ), ( "swh:1:rel:81b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.RELEASE, ), ( "swh:1:snp:91b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.SNAPSHOT, ), ], ) def test_deposit_metadata_swhid( swhid, target_type, authenticated_client, deposit_collection, atom_dataset, swh_storage, ): """Posting a swhid reference is stored on raw extrinsic metadata storage """ swhid_reference = parse_swhid(swhid) swhid_core = attr.evolve(swhid_reference, metadata={}) xml_data = atom_dataset["entry-data-with-swhid"].format(swhid=swhid) deposit_client = authenticated_client.deposit_client - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=xml_data, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) # Ensure the deposit is finalized deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert isinstance(swhid_core, SWHID) assert deposit.swhid == str(swhid_core) assert deposit.swhid_context == str(swhid_reference) assert deposit.complete_date == deposit.reception_date assert deposit.complete_date is not None assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url, metadata={"name": deposit_client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher page_results = swh_storage.raw_extrinsic_metadata_get( target_type, swhid_core, metadata_authority ) discovery_date = page_results.results[0].discovery_date assert len(page_results.results) == 1 assert page_results.next_page_token is None object_type, metadata_context = compute_metadata_context(swhid_reference) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=object_type, target=swhid_core, discovery_date=discovery_date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=xml_data.encode(), **metadata_context, ) ], next_page_token=None, ) assert deposit.complete_date == discovery_date @pytest.mark.parametrize( "url", ["https://gitlab.org/user/repo", "https://whatever.else/repo",] ) def test_deposit_metadata_origin( url, authenticated_client, deposit_collection, atom_dataset, swh_storage, ): """Posting a swhid reference is stored on raw extrinsic metadata storage """ xml_data = atom_dataset["entry-data-with-origin-reference"].format(url=url) deposit_client = authenticated_client.deposit_client - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=xml_data, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) # Ensure the deposit is finalized deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) # we got not swhid as input so we cannot have those assert deposit.swhid is None assert deposit.swhid_context is None assert deposit.complete_date == deposit.reception_date assert deposit.complete_date is not None assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url, metadata={"name": deposit_client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.ORIGIN, url, metadata_authority ) discovery_date = page_results.results[0].discovery_date assert len(page_results.results) == 1 assert page_results.next_page_token is None assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.ORIGIN, target=url, discovery_date=discovery_date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=xml_data.encode(), ) ], next_page_token=None, ) assert deposit.complete_date == discovery_date diff --git a/swh/deposit/tests/api/test_collection_reuse_slug.py b/swh/deposit/tests/api/test_collection_reuse_slug.py index 13f7fbaa..fc5156b2 100644 --- a/swh/deposit/tests/api/test_collection_reuse_slug.py +++ b/swh/deposit/tests/api/test_collection_reuse_slug.py @@ -1,241 +1,242 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO from django.urls import reverse from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, SE_IRI, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml +from swh.deposit.tests.common import post_atom from ..conftest import create_deposit def test_act_on_deposit_rejected_is_not_permitted( authenticated_client, deposit_collection, rejected_deposit, atom_dataset ): deposit = rejected_deposit - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(SE_IRI, args=[deposit.collection.name, deposit.id]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST msg = "You can only act on deposit with status '%s'" % ( DEPOSIT_STATUS_PARTIAL, ) assert msg in response.content.decode("utf-8") def test_add_deposit_when_partial_makes_new_deposit( authenticated_client, deposit_collection, partial_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when previous is partial makes new deposit """ deposit = partial_deposit assert deposit.status == DEPOSIT_STATUS_PARTIAL origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id # new deposit new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None def test_add_deposit_when_failed_makes_new_deposit_with_no_parent( authenticated_client, deposit_collection, failed_deposit, atom_dataset, deposit_user ): """Posting deposit on collection when deposit done makes new deposit with parent """ deposit = failed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None def test_add_deposit_when_done_makes_new_deposit_with_parent_old_one( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when deposit done makes new deposit with parent """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit def test_add_deposit_external_id_conflict_no_parent( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, atom_dataset, sample_archive, deposit_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client does not create a parent relationship """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id # create a deposit for that other user, with the same slug other_deposit = create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert other_deposit.id != deposit_id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit.parent is None def test_add_deposit_external_id_conflict_with_parent( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, completed_deposit, atom_dataset, sample_archive, deposit_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client creates a parent relationship with the deposit of the right client instead of the last matching deposit This test does not have an equivalent for origin url conflicts, as these can not happen (assuming clients do not have provider_url overlaps) """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # create a deposit for that other user, with the same slug other_deposit = create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, deposit.external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id assert other_deposit.id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.external_id == new_deposit.external_id assert new_deposit != deposit assert new_deposit.parent == deposit diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 60904f51..4bb1ff27 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,183 +1,180 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on SE-IRI.""" from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, EDIT_IRI, SE_IRI, ) from swh.deposit.models import Deposit, DepositRequest +from swh.deposit.tests.common import post_atom def test_add_both_archive_and_metadata_to_deposit( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, sample_archive, deposit_user, ): """Scenario: Add both a new archive and new metadata to a partial deposit is ok Response: 201 """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 1 + 1, "New deposit request archive got added" expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == data_atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 + 1, "New deposit request metadata got added" def test_post_metadata_empty_post_finalize_deposit_ok( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Empty atom post entry with header in-progress to false transitions deposit to 'deposited' status Response: 200 """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) - response = authenticated_client.post( - update_uri, - content_type="application/atom+xml;type=entry", - data="", - size=0, - HTTP_IN_PROGRESS=False, + response = post_atom( + authenticated_client, update_uri, data="", size=0, HTTP_IN_PROGRESS=False, ) assert response.status_code == status.HTTP_200_OK deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_put_update_metadata_and_archive_deposit_partial_nominal( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, sample_archive, deposit_user, ): """Scenario: Replace metadata and archive(s) with new ones should be ok Response: 204 """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 archive = InMemoryUploadedFile( BytesIO(sample_archive["data"]), field_name=sample_archive["name"], name=sample_archive["name"], content_type="application/x-tar", size=sample_archive["length"], charset=None, ) data_atom_entry = atom_dataset["entry-data1"] atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry.encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset="utf-8", ) update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.put( update_uri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert response.status_code == status.HTTP_204_NO_CONTENT # check we updated the metadata part requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == data_atom_entry assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # and the archive part requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) != set(requests_archive1) diff --git a/swh/deposit/tests/api/test_deposit_update_atom.py b/swh/deposit/tests/api/test_deposit_update_atom.py index 8dfd2ab2..b903f0ae 100644 --- a/swh/deposit/tests/api/test_deposit_update_atom.py +++ b/swh/deposit/tests/api/test_deposit_update_atom.py @@ -1,630 +1,608 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import attr from django.urls import reverse import pytest from rest_framework import status from swh.deposit.api.common import ACCEPT_ARCHIVE_CONTENT_TYPES from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, EDIT_IRI, EM_IRI, SE_IRI, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml +from swh.deposit.tests.common import post_atom, put_atom from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import parse_swhid, swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_post_deposit_atom_entry_multiple_steps( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """After initial deposit, updating a deposit should return a 201 """ # given origin_url = deposit_user.provider_url + "2225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): deposit = Deposit.objects.get(origin_url=origin_url) # when - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_IN_PROGRESS="True", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url is None # not provided yet assert deposit.status == "partial" # one associated request to a deposit deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 1 atom_entry_data = atom_dataset["entry-only-create-origin"] % (origin_url) for link in response_content["atom:link"]: if link["@rel"] == "http://purl.org/net/sword/terms/add": se_iri = link["@href"] break else: assert False, f"missing SE-IRI from {response_content['link']}" # when updating the first deposit post - response = authenticated_client.post( - se_iri, - content_type="application/atom+xml;type=entry", - data=atom_entry_data, - HTTP_IN_PROGRESS="False", + response = post_atom( + authenticated_client, se_iri, data=atom_entry_data, HTTP_IN_PROGRESS="False", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert len(Deposit.objects.all()) == 1 # now 2 associated requests to a same deposit deposit_requests = DepositRequest.objects.filter(deposit=deposit).order_by("id") assert len(deposit_requests) == 2 atom_entry_data1 = atom_dataset["entry-data1"] expected_meta = [ {"metadata": parse_xml(atom_entry_data1), "raw_metadata": atom_entry_data1}, {"metadata": parse_xml(atom_entry_data), "raw_metadata": atom_entry_data}, ] for i, deposit_request in enumerate(deposit_requests): actual_metadata = deposit_request.metadata assert actual_metadata == expected_meta[i]["metadata"] assert deposit_request.raw_metadata == expected_meta[i]["raw_metadata"] assert bool(deposit_request.archive) is False def test_replace_metadata_to_deposit_is_possible( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, deposit_user, ): """Replace all metadata with another one should return a 204 response """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) - response = authenticated_client.put( - update_uri, - content_type="application/atom+xml;type=entry", - data=atom_dataset["entry-data1"], + response = put_atom( + authenticated_client, update_uri, data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_204_NO_CONTENT requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_metadata_to_deposit_is_possible( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, deposit_user, ): """Add metadata with another one should return a 204 response """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) atom_entry = atom_dataset["entry-data1"] - response = authenticated_client.post( - update_uri, content_type="application/atom+xml;type=entry", data=atom_entry - ) + response = post_atom(authenticated_client, update_uri, data=atom_entry) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 2 expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_metadata_to_unknown_deposit( deposit_collection, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 1000 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(SE_IRI, args=[deposit_collection, unknown_deposit_id]) - response = authenticated_client.post( - url, - content_type="application/atom+xml;type=entry", - data=atom_dataset["entry-data1"], - ) + response = post_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit 1000 does not exist" in response_content["sword:error"]["atom:summary"] ) def test_add_metadata_to_unknown_collection( partial_deposit, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ deposit = partial_deposit unknown_collection_name = "unknown-collection" try: DepositCollection.objects.get(name=unknown_collection_name) except DepositCollection.DoesNotExist: assert True url = reverse(SE_IRI, args=[unknown_collection_name, deposit.id]) - response = authenticated_client.post( - url, - content_type="application/atom+xml;type=entry", - data=atom_dataset["entry-data1"], - ) + response = post_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert "Unknown collection name" in response_content["sword:error"]["atom:summary"] def test_replace_metadata_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 998 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EDIT_IRI, args=[deposit_collection.name, unknown_deposit_id]) - response = authenticated_client.put( - url, - content_type="application/atom+xml;type=entry", - data=atom_dataset["entry-data1"], - ) + response = put_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_post_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (POST) archive with wrong content type should return 400 """ deposit = partial_deposit update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/x-gtar-compressed", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (PUT) archive with wrong content type should return 400 """ # given deposit = partial_deposit # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) - response = authenticated_client.put( - update_uri, - content_type="application/atom+xml;type=entry", - data=atom_dataset["entry-data1"], + response = put_atom( + authenticated_client, update_uri, data=atom_dataset["entry-data1"], ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_update_metadata_done_deposit_nominal( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, sample_data, swh_storage, ): """Nominal scenario, client send an update of metadata on a deposit with status "done" with an existing swhid. Such swhid has its metadata updated accordingly both in the deposit backend and in the metadata storage. Response: 204 """ deposit_swhid = parse_swhid(complete_deposit.swhid) assert deposit_swhid.object_type == "directory" directory_id = hash_to_bytes(deposit_swhid.object_id) # directory targeted by the complete_deposit does not exist in the storage assert list(swh_storage.directory_missing([directory_id])) == [directory_id] # so let's create a directory reference in the storage (current deposit targets an # unknown swhid) existing_directory = sample_data.directory swh_storage.directory_add([existing_directory]) assert list(swh_storage.directory_missing([existing_directory.id])) == [] # and patch one complete deposit swhid so it targets said reference complete_deposit.swhid = swhid("directory", existing_directory.id) complete_deposit.save() actual_existing_requests_archive = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) nb_archives = len(actual_existing_requests_archive) actual_existing_requests_metadata = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) nb_metadata = len(actual_existing_requests_metadata) update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) - response = authenticated_client.put( + response = put_atom( + authenticated_client, update_uri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_204_NO_CONTENT new_requests_meta = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) assert len(new_requests_meta) == nb_metadata + 1 request_meta1 = new_requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) assert len(requests_archive1) == nb_archives assert set(actual_existing_requests_archive) == set(requests_archive1) # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url, metadata={"name": complete_deposit.client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher directory_swhid = parse_swhid(complete_deposit.swhid) page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.DIRECTORY, directory_swhid, metadata_authority ) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.DIRECTORY, target=directory_swhid, discovery_date=request_meta1.date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=raw_metadata1.encode(), origin=complete_deposit.origin_url, ) ], next_page_token=None, ) def test_put_update_metadata_done_deposit_failure_mismatched_swhid( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit with SWHID not matching the deposit's. Response: 400 """ incorrect_swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" assert complete_deposit.swhid != incorrect_swhid update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) - response = authenticated_client.put( + response = put_atom( + authenticated_client, update_uri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=incorrect_swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Mismatched provided SWHID" in response.content def test_put_update_metadata_done_deposit_failure_malformed_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with a malformed xml Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) - response = authenticated_client.put( + response = put_atom( + authenticated_client, update_uri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-ko"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_put_update_metadata_done_deposit_failure_empty_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with an empty xml. Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) atom_content = atom_dataset["entry-data-empty-body"] - response = authenticated_client.put( + response = put_atom( + authenticated_client, update_uri, - content_type="application/atom+xml;type=entry", data=atom_content, HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_put_update_metadata_done_deposit_failure_functional_checks( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done without required incomplete metadata Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) - response = authenticated_client.put( + response = put_atom( + authenticated_client, update_uri, - content_type="application/atom+xml;type=entry", # no title, nor author, nor name fields data=atom_dataset["entry-data-fail-metadata-functional-checks"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content # detail on the errors msg = ( b"- Mandatory fields are missing (" b"atom:name or atom:title or codemeta:name, " b"atom:author or codemeta:author)" ) assert msg in response.content def test_put_atom_with_create_origin_and_external_identifier( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ was deprecated before was introduced, clients should get an error when trying to use both """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) - response = authenticated_client.post( + response = post_atom( + authenticated_client, url, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when - response = authenticated_client.put( + response = put_atom( + authenticated_client, edit_iri, - content_type="application/atom+xml;type=entry", data=atom_dataset["error-with-external-identifier"] % external_id, - # + headers HTTP_IN_PROGRESS="false", ) assert b"<external_identifier> is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_put_atom_with_create_origin_and_reference( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ and are mutually exclusive """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) - response = authenticated_client.post( + response = post_atom( + authenticated_client, url, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when - response = authenticated_client.put( + response = put_atom( + authenticated_client, edit_iri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-with-origin-reference"].format(url=origin_url), - # + headers HTTP_IN_PROGRESS="false", ) assert b"only one may be used on a given deposit" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_update_binary.py b/swh/deposit/tests/api/test_deposit_update_binary.py index bad6ee65..77cb4388 100644 --- a/swh/deposit/tests/api/test_deposit_update_binary.py +++ b/swh/deposit/tests/api/test_deposit_update_binary.py @@ -1,406 +1,408 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on EM-IRI""" from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI, SE_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( check_archive, create_arborescence_archive, post_archive, + post_atom, put_archive, + put_atom, ) def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="true", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = put_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive(sample_archive["name"], deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit_id]) se_iri = reverse("se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready r = put_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding file is no longer possible since the deposit's status # is ready r = post_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # replacing metadata is no longer possible since the deposit's # status is ready - r = authenticated_client.put( + r = put_atom( + authenticated_client, edit_iri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready - r = authenticated_client.post( + r = post_atom( + authenticated_client, se_iri, - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index 399f6481..f2999c46 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,165 +1,177 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os import re import tarfile import tempfile from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, "rb") as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b"" for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { "dir": os.path.dirname(archive_path), "name": os.path.basename(archive_path), "path": archive_path, "length": length, "sha1sum": sha1sum.hexdigest(), "md5sum": md5sum.hexdigest(), "data": data, } def _compress(path, extension, dir_path): """Compress path according to extension """ if extension == "zip" or extension == "tar": return tarball.compress(path, extension, dir_path) elif "." in extension: split_ext = extension.split(".") if split_ext[0] != "tar": raise ValueError( "Development error, only zip or tar archive supported, " "%s not supported" % extension ) # deal with specific tar mode = split_ext[1] supported_mode = ["xz", "gz", "bz2"] if mode not in supported_mode: raise ValueError( "Development error, only %s supported, %s not supported" % (supported_mode, mode) ) files = tarball._ls(dir_path) with tarfile.open(path, "w:%s" % mode) as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) return path def create_arborescence_archive( root_path, archive_name, filename, content, up_to_size=None, extension="zip" ): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Args: root_path (str): Location path of the archive to create archive_name (str): Archive's name (without extension) filename (str): Archive's content is only one filename content (bytes): Content of the filename up_to_size (int | None): Fill in the blanks size to oversize or complete an archive's size extension (str): Extension of the archive to write (default is zip) Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, "wb") as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b"0" * batch_size) count += batch_size _path = "%s.%s" % (dir_path, extension) _path = _compress(_path, extension, dir_path) return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, "w:gz") as _archive: _archive.add(archive["path"], arcname=archive["name"]) return compute_info(invalid_archive_path) def check_archive(archive_name: str, archive_name_to_check: str): """Helper function to ensure archive_name is present within the archive_name_to_check. Raises: AssertionError if archive_name is not present within archive_name_to_check """ ARCHIVE_FILEPATH_PATTERN = re.compile( r"client_[0-9].*/[0-9]{8}-[0-9]{6}\.[0-9]{6}/[a-zA-Z0-9.].*" ) assert ARCHIVE_FILEPATH_PATTERN.match(archive_name_to_check) if "." in archive_name: filename, extension = archive_name.split(".") pattern = re.compile(".*/%s.*\\.%s" % (filename, extension)) else: pattern = re.compile(".*/%s" % archive_name) assert pattern.match(archive_name_to_check) is not None def _post_or_put_archive(f, url, archive, slug=None, in_progress=None, **kwargs): default_kwargs = dict( content_type="application/zip", CONTENT_LENGTH=archive["length"], HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive["name"],), HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", ) kwargs = {**default_kwargs, **kwargs} return f(url, data=archive["data"], HTTP_CONTENT_MD5=archive["md5sum"], **kwargs,) def post_archive(authenticated_client, *args, **kwargs): return _post_or_put_archive(authenticated_client.post, *args, **kwargs) def put_archive(authenticated_client, *args, **kwargs): return _post_or_put_archive(authenticated_client.put, *args, **kwargs) + + +def post_atom(authenticated_client, url, data, **kwargs): + return authenticated_client.post( + url, content_type="application/atom+xml;type=entry", data=data, **kwargs + ) + + +def put_atom(authenticated_client, url, data, **kwargs): + return authenticated_client.put( + url, content_type="application/atom+xml;type=entry", data=data, **kwargs + ) diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py index d6b27759..d01970c7 100644 --- a/swh/deposit/tests/conftest.py +++ b/swh/deposit/tests/conftest.py @@ -1,475 +1,479 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from functools import partial from io import BytesIO import os import re from typing import Mapping from django.test.utils import setup_databases # type: ignore from django.urls import reverse import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import pytest from rest_framework import status from rest_framework.test import APIClient import yaml from swh.core.config import read from swh.core.pytest_plugin import get_response_cb from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, SE_IRI, setup_django_for, ) from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import create_arborescence_archive, post_archive +from swh.deposit.tests.common import ( + create_arborescence_archive, + post_archive, + post_atom, +) from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT, swhid from swh.scheduler import get_scheduler # mypy is asked to ignore the import statement above because setup_databases # is not part of the d.t.utils.__all__ variable. TEST_USER = { "username": "test", "password": "password", "email": "test@example.org", "provider_url": "https://hal-test.archives-ouvertes.fr/", "domain": "archives-ouvertes.fr/", "collection": {"name": "test"}, } ANOTHER_TEST_USER = { "username": "test2", "password": "password2", "email": "test@example2.org", "provider_url": "https://hal-test.archives-ouvertes.example/", "domain": "archives-ouvertes.example/", "collection": {"name": "another-collection"}, } def pytest_configure(): setup_django_for("testing") @pytest.fixture def requests_mock_datadir(datadir, requests_mock_datadir): """Override default behavior to deal with put/post methods """ cb = partial(get_response_cb, datadir=datadir) requests_mock_datadir.put(re.compile("https://"), body=cb) requests_mock_datadir.post(re.compile("https://"), body=cb) return requests_mock_datadir @pytest.fixture() def deposit_config(swh_scheduler_config, swh_storage_backend_config): return { "max_upload_size": 500, "extraction_dir": "/tmp/swh-deposit/test/extraction-dir", "checks": False, "scheduler": {"cls": "local", **swh_scheduler_config,}, "storage_metadata": swh_storage_backend_config, } @pytest.fixture() def deposit_config_path(tmp_path, monkeypatch, deposit_config): conf_path = os.path.join(tmp_path, "deposit.yml") with open(conf_path, "w") as f: f.write(yaml.dump(deposit_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path @pytest.fixture(autouse=True) def deposit_autoconfig(deposit_config_path): """Enforce config for deposit classes inherited from APIConfig.""" cfg = read(deposit_config_path) if "scheduler" in cfg: # scheduler setup: require the check-deposit and load-deposit tasks scheduler = get_scheduler(**cfg["scheduler"]) task_types = [ { "type": "check-deposit", "backend_name": "swh.deposit.loader.tasks.ChecksDepositTsk", "description": "Check deposit metadata/archive before loading", "num_retries": 3, }, { "type": "load-deposit", "backend_name": "swh.loader.package.deposit.tasks.LoadDeposit", "description": "Loading deposit archive into swh archive", "num_retries": 3, }, ] for task_type in task_types: scheduler.create_task_type(task_type) @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", "tests"), ("USER", postgresql_proc.user), # noqa ("HOST", postgresql_proc.host), # noqa ("PORT", postgresql_proc.port), # noqa } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) def execute_sql(sql): """Execute sql to postgres db""" with psycopg2.connect(database="postgres") as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() cur.execute(sql) @pytest.fixture(autouse=True, scope="session") def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ["http_proxy"] = "http://localhost:999" os.environ["https_proxy"] = "http://localhost:999" def create_deposit_collection(collection_name: str): """Create a deposit collection with name collection_name """ from swh.deposit.models import DepositCollection try: collection = DepositCollection._default_manager.get(name=collection_name) except DepositCollection.DoesNotExist: collection = DepositCollection(name=collection_name) collection.save() return collection def deposit_collection_factory(collection_name=TEST_USER["collection"]["name"]): @pytest.fixture def _deposit_collection(db, collection_name=collection_name): return create_deposit_collection(collection_name) return _deposit_collection deposit_collection = deposit_collection_factory() deposit_another_collection = deposit_collection_factory("another-collection") def _create_deposit_user(db, collection, user_data): """Create/Return the test_user "test" """ from swh.deposit.models import DepositClient try: user = DepositClient._default_manager.get(username=user_data["username"]) except DepositClient.DoesNotExist: user = DepositClient._default_manager.create_user( username=user_data["username"], email=user_data["email"], password=user_data["password"], provider_url=user_data["provider_url"], domain=user_data["domain"], ) user.collections = [collection.id] user.save() return user @pytest.fixture def deposit_user(db, deposit_collection): return _create_deposit_user(db, deposit_collection, TEST_USER) @pytest.fixture def deposit_another_user(db, deposit_another_collection): return _create_deposit_user(db, deposit_another_collection, ANOTHER_TEST_USER) @pytest.fixture def client(): """Override pytest-django one which does not work for djangorestframework. """ return APIClient() # <- drf's client def _create_authenticated_client(client, user, user_data): """Returned a logged client This also patched the client instance to keep a reference on the associated deposit_user. """ _token = "%s:%s" % (user.username, user_data["password"]) token = base64.b64encode(_token.encode("utf-8")) authorization = "Basic %s" % token.decode("utf-8") client.credentials(HTTP_AUTHORIZATION=authorization) client.deposit_client = user yield client client.logout() @pytest.fixture def authenticated_client(client, deposit_user): yield from _create_authenticated_client(client, deposit_user, TEST_USER) @pytest.fixture def another_authenticated_client(deposit_another_user): client = APIClient() yield from _create_authenticated_client( client, deposit_another_user, ANOTHER_TEST_USER ) @pytest.fixture def sample_archive(tmp_path): """Returns a sample archive """ tmp_path = str(tmp_path) # pytest version limitation in previous version archive = create_arborescence_archive( tmp_path, "archive1", "file1", b"some content in file" ) return archive @pytest.fixture def atom_dataset(datadir) -> Mapping[str, str]: """Compute the paths to atom files. Returns: Dict of atom name per content (bytes) """ atom_path = os.path.join(datadir, "atom") data = {} for filename in os.listdir(atom_path): filepath = os.path.join(atom_path, filename) with open(filepath, "rb") as f: raw_content = f.read().decode("utf-8") # Keep the filename without extension atom_name = filename.split(".")[0] data[atom_name] = raw_content return data def create_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False, ): """Create a skeleton shell deposit """ url = reverse(COL_IRI, args=[collection_name]) # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS=str(in_progress).lower(), ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() from swh.deposit.models import Deposit response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit._default_manager.get(id=deposit_id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def create_binary_deposit( authenticated_client, collection_name: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, atom_dataset: Mapping[str, bytes] = {}, **kwargs, ): """Create a deposit with both metadata and archive set. Then alters its status to `deposit_status`. """ deposit = create_deposit( authenticated_client, collection_name, deposit_status=DEPOSIT_STATUS_PARTIAL, **kwargs, ) origin_url = deposit.client.provider_url + deposit.external_id - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(SE_IRI, args=[collection_name, deposit.id]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED assert deposit.status == DEPOSIT_STATUS_PARTIAL from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit.id) assert deposit.status == deposit_status return deposit def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False): """Build deposit with a specific status """ @pytest.fixture() def _deposit( sample_archive, deposit_collection, authenticated_client, deposit_status=deposit_status, ): external_id = "external-id-%s" % deposit_status return create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id=external_id, deposit_status=deposit_status, in_progress=in_progress, ) return _deposit deposited_deposit = deposit_factory() rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) partial_deposit = deposit_factory( deposit_status=DEPOSIT_STATUS_PARTIAL, in_progress=True ) verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED) completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) @pytest.fixture def partial_deposit_with_metadata( sample_archive, deposit_collection, authenticated_client, atom_dataset ): """Returns deposit with archive and metadata provided, status 'partial' """ return create_binary_deposit( authenticated_client, deposit_collection.name, sample_archive=sample_archive, external_id="external-id-partial", in_progress=True, deposit_status=DEPOSIT_STATUS_PARTIAL, atom_dataset=atom_dataset, ) @pytest.fixture def partial_deposit_only_metadata( deposit_collection, authenticated_client, atom_dataset ): - response = authenticated_client.post( + response = post_atom( + authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), - content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG="external-id-partial", HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content["swh:deposit_id"] from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_PARTIAL return deposit @pytest.fixture def complete_deposit(sample_archive, deposit_collection, authenticated_client): """Returns a completed deposit (load success) """ deposit = create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id="external-id-complete", deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS, ) origin = "https://hal.archives-ouvertes.fr/hal-01727745" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10" snapshot_id = "e5e82d064a9c3df7464223042e0c55d72ccff7f0" deposit.swhid = swhid(DIRECTORY, directory_id) deposit.swhid_context = swhid( DIRECTORY, directory_id, metadata={ "origin": origin, "visit": swhid(SNAPSHOT, snapshot_id), "anchor": swhid(REVISION, revision_id), "path": "/", }, ) deposit.save() return deposit @pytest.fixture() def tmp_path(tmp_path): return str(tmp_path) # issue with oldstable's pytest version