diff --git a/swh/deposit/tests/api/conftest.py b/swh/deposit/tests/api/conftest.py index 3383d95e..422de679 100644 --- a/swh/deposit/tests/api/conftest.py +++ b/swh/deposit/tests/api/conftest.py @@ -1,93 +1,93 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from swh.deposit.api.private.deposit_check import APIChecks from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_VERIFIED, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def ready_deposit_ok(partial_deposit_with_metadata): """Returns a deposit ready for checks (it will pass the checks). """ deposit = partial_deposit_with_metadata deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() return deposit @pytest.fixture def ready_deposit_verified(partial_deposit_with_metadata): """Returns a deposit ready for checks (it will pass the checks). """ deposit = partial_deposit_with_metadata deposit.status = DEPOSIT_STATUS_VERIFIED deposit.save() return deposit @pytest.fixture def ready_deposit_only_metadata(partial_deposit_only_metadata): """Deposit in status ready that will fail the checks (because missing archive). """ deposit = partial_deposit_only_metadata deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() return deposit @pytest.fixture def ready_deposit_invalid_archive(authenticated_client, deposit_collection): url = reverse(COL_IRI, args=[deposit_collection.name]) data = b"some data which is clearly not a zip file" md5sum = hashlib.md5(data).hexdigest() # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG="external-id-invalid", HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) response_content = parse_xml(response.content) deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() return deposit @pytest.fixture def swh_checks_deposit(): return APIChecks() diff --git a/swh/deposit/tests/api/test_collection.py b/swh/deposit/tests/api/test_collection.py index d7c3d0df..4d1beb1a 100644 --- a/swh/deposit/tests/api/test_collection.py +++ b/swh/deposit/tests/api/test_collection.py @@ -1,75 +1,75 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib from io import BytesIO -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_REJECTED from swh.deposit.parsers import parse_xml def test_deposit_post_will_fail_with_401(client): """Without authentication, endpoint refuses access with 401 response """ url = reverse(COL_IRI, args=["hal"]) response = client.post(url) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_access_to_another_user_collection_is_forbidden( authenticated_client, deposit_another_collection, deposit_user ): """Access to another user collection should return a 403 """ coll2 = deposit_another_collection url = reverse(COL_IRI, args=[coll2.name]) response = authenticated_client.post(url) assert response.status_code == status.HTTP_403_FORBIDDEN msg = "Client %s cannot access collection %s" % (deposit_user.username, coll2.name,) assert msg in response.content.decode("utf-8") def test_delete_on_col_iri_not_supported(authenticated_client, deposit_collection): """Delete on col iri should return a 405 response """ url = reverse(COL_IRI, args=[deposit_collection.name]) response = authenticated_client.delete(url) assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED assert "DELETE method is not supported on this endpoint" in response.content.decode( "utf-8" ) def create_deposit_with_rejection_status(authenticated_client, deposit_collection): url = reverse(COL_IRI, args=[deposit_collection.name]) data = b"some data which is clearly not a zip file" md5sum = hashlib.md5(data).hexdigest() external_id = "some-external-id-1" # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) actual_state = response_content["deposit_status"] assert actual_state == DEPOSIT_STATUS_REJECTED diff --git a/swh/deposit/tests/api/test_collection_add_to_origin.py b/swh/deposit/tests/api/test_collection_add_to_origin.py index 9ba9e401..7af4f398 100644 --- a/swh/deposit/tests/api/test_collection_add_to_origin.py +++ b/swh/deposit/tests/api/test_collection_add_to_origin.py @@ -1,159 +1,159 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import post_atom from ..conftest import create_deposit def test_add_deposit_with_add_to_origin( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit with creates a new deposit with parent """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-with-add-to-origin"] % origin_url, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url def test_add_deposit_add_to_origin_conflict( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, atom_dataset, sample_archive, deposit_user, deposit_another_user, ): """Posting a deposit with an referencing an origin owned by a different client raises an error """ external_id = "foobar" origin_url = deposit_another_user.provider_url + external_id # create a deposit for that other user, with the same slug create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert b"must start with" in response.content def test_add_deposit_add_to_wrong_origin( authenticated_client, deposit_collection, atom_dataset, sample_archive, ): """Posting a deposit with an referencing an origin not starting with the provider_url raises an error """ origin_url = "http://example.org/foo" # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert b"must start with" in response.content def test_add_deposit_with_add_to_origin_and_external_identifier( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit with creates a new deposit with parent """ # given multiple deposit already loaded origin_url = deposit_user.provider_url + completed_deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-with-both-add-to-origin-and-external-id"] % origin_url, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"<external_identifier> is deprecated" in response.content def test_post_deposit_atom_403_add_to_wrong_origin_url_prefix( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Creating an origin for a prefix not owned by the client is forbidden """ origin_url = "http://example.org/foo" response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-with-add-to-origin"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_403_FORBIDDEN expected_msg = ( f"Cannot create origin {origin_url}, " f"it must start with {deposit_user.provider_url}" ) assert expected_msg in response.content.decode() diff --git a/swh/deposit/tests/api/test_collection_post_atom.py b/swh/deposit/tests/api/test_collection_post_atom.py index cd0a7fad..39f98b05 100644 --- a/swh/deposit/tests/api/test_collection_post_atom.py +++ b/swh/deposit/tests/api/test_collection_post_atom.py @@ -1,661 +1,661 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests the handling of the Atom content when doing a POST Col-IRI.""" from io import BytesIO import uuid import attr -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_LOAD_SUCCESS, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import post_atom from swh.deposit.utils import compute_metadata_context from swh.model.identifiers import SWHID, parse_swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_post_deposit_atom_201_even_with_decimal( authenticated_client, deposit_collection, atom_dataset ): """Posting an initial atom entry should return 201 with deposit receipt """ atom_error_with_decimal = atom_dataset["error-with-decimal"] response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_error_with_decimal, HTTP_SLUG="external-id", HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) dr = DepositRequest.objects.get(deposit=deposit) assert dr.metadata is not None sw_version = dr.metadata.get("codemeta:softwareVersion") assert sw_version == "10.4" def test_post_deposit_atom_400_with_empty_body( authenticated_client, deposit_collection, atom_dataset ): """Posting empty body request should return a 400 response """ atom_content = atom_dataset["entry-data-empty-body"] response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_content, HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_post_deposit_atom_400_badly_formatted_atom( authenticated_client, deposit_collection, atom_dataset ): """Posting a badly formatted atom should return a 400 response """ response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-badly-formatted"], HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_post_deposit_atom_parsing_error( authenticated_client, deposit_collection, atom_dataset ): """Posting parsing error prone atom should return 400 """ response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-parsing-error-prone"], HTTP_SLUG="external-id", ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_post_deposit_atom_400_both_create_origin_and_add_to_origin( authenticated_client, deposit_collection, atom_dataset ): """Posting a badly formatted atom should return a 400 response """ response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data-with-both-create-origin-and-add-to-origin"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"<swh:create_origin> and <swh:add_to_origin> " b"are mutually exclusive" ) in response.content def test_post_deposit_atom_403_create_wrong_origin_url_prefix( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Creating an origin for a prefix not owned by the client is forbidden """ origin_url = "http://example.org/foo" response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_403_FORBIDDEN expected_msg = ( f"Cannot create origin {origin_url}, " f"it must start with {deposit_user.provider_url}" ) assert expected_msg in response.content.decode() def test_post_deposit_atom_use_slug_header( authenticated_client, deposit_collection, deposit_user, atom_dataset, mocker ): """Posting an atom entry with a slug header but no origin url generates an origin url from the slug """ url = reverse(COL_IRI, args=[deposit_collection.name]) slug = str(uuid.uuid4()) # when response = post_atom( authenticated_client, url, data=atom_dataset["entry-data-no-origin-url"], HTTP_IN_PROGRESS="false", HTTP_SLUG=slug, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + slug assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_atom_no_origin_url_nor_slug_header( authenticated_client, deposit_collection, deposit_user, atom_dataset, mocker ): """Posting an atom entry without an origin url or a slug header should generate one """ url = reverse(COL_IRI, args=[deposit_collection.name]) slug = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=slug) # when response = post_atom( authenticated_client, url, data=atom_dataset["entry-data-no-origin-url"], HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + slug assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_atom_with_slug_and_external_identifier( authenticated_client, deposit_collection, deposit_user, atom_dataset, mocker ): """Even though is deprecated, it should still be allowed when it matches the slug, so that we don't break existing clients """ url = reverse(COL_IRI, args=[deposit_collection.name]) slug = str(uuid.uuid4()) # when response = post_atom( authenticated_client, url, data=atom_dataset["error-with-external-identifier"] % slug, HTTP_IN_PROGRESS="false", HTTP_SLUG=slug, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + slug assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_atom_with_mismatched_slug_and_external_identifier( authenticated_client, deposit_collection, atom_dataset ): """Posting an atom entry with mismatched slug header and external_identifier should return a 400 """ external_id = "foobar" url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = post_atom( authenticated_client, url, data=atom_dataset["error-with-external-identifier"] % external_id, HTTP_IN_PROGRESS="false", HTTP_SLUG="something", ) assert ( b"The <external_identifier> tag and Slug header are deprecated" in response.content ) assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_with_create_origin_and_external_identifier( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ was deprecated before was introduced, clients should get an error when trying to use both """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) document = atom_dataset["error-with-external-identifier-and-create-origin"].format( external_id=external_id, url=origin_url, ) # when response = post_atom( authenticated_client, url, data=document, HTTP_IN_PROGRESS="false", ) assert b"<external_identifier> is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_with_create_origin_and_reference( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ and are mutually exclusive """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) document = atom_dataset["error-with-reference-and-create-origin"].format( external_id=external_id, url=origin_url, ) # when response = post_atom( authenticated_client, url, data=document, HTTP_IN_PROGRESS="false", ) assert b"only one may be used on a given deposit" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_atom_unknown_collection(authenticated_client, atom_dataset): """Posting an atom entry to an unknown collection should return a 404 """ unknown_collection = "unknown-one" with pytest.raises(DepositCollection.DoesNotExist): DepositCollection.objects.get(name=unknown_collection) response = post_atom( authenticated_client, reverse(COL_IRI, args=[unknown_collection]), data=atom_dataset["entry-data0"], HTTP_SLUG="something", ) assert response.status_code == status.HTTP_404_NOT_FOUND assert b"Unknown collection" in response.content def test_post_deposit_atom_entry_initial( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Posting an initial atom entry should return 201 with deposit receipt """ # given origin_url = deposit_user.provider_url + "1225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(origin_url=origin_url) atom_entry_data = atom_dataset["entry-data0"] % origin_url # when response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_entry_data, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.metadata is not None assert deposit_request.raw_metadata == atom_entry_data assert bool(deposit_request.archive) is False def test_post_deposit_atom_entry_with_codemeta( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """Posting an initial atom entry should return 201 with deposit receipt """ # given origin_url = deposit_user.provider_url + "1225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(origin_url=origin_url) atom_entry_data = atom_dataset["codemeta-sample"] % origin_url # when response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_entry_data, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.metadata is not None assert deposit_request.raw_metadata == atom_entry_data assert bool(deposit_request.archive) is False def test_deposit_metadata_invalid( authenticated_client, deposit_collection, atom_dataset ): """Posting invalid swhid reference is bad request returned to client """ invalid_swhid = "swh:1:dir :31b5c8cc985d190b5a7ef4878128ebfdc2358f49" xml_data = atom_dataset["entry-data-with-swhid"].format(swhid=invalid_swhid) response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=xml_data, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Invalid SWHID reference" in response.content def test_deposit_metadata_fails_functional_checks( authenticated_client, deposit_collection, atom_dataset ): """Posting functionally invalid metadata swhid is bad request returned to client """ swhid = "swh:1:dir:31b5c8cc985d190b5a7ef4878128ebfdc2358f49" invalid_xml_data = atom_dataset[ "entry-data-with-swhid-fail-metadata-functional-checks" ].format(swhid=swhid) response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=invalid_xml_data, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content @pytest.mark.parametrize( "swhid,target_type", [ ( "swh:1:cnt:01b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.CONTENT, ), ( "swh:1:dir:11b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.DIRECTORY, ), ( "swh:1:rev:21b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.REVISION, ), ( "swh:1:rel:31b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.RELEASE, ), ( "swh:1:snp:41b5c8cc985d190b5a7ef4878128ebfdc2358f49", MetadataTargetType.SNAPSHOT, ), ( "swh:1:cnt:51b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.CONTENT, ), ( "swh:1:dir:c4993c872593e960dc84e4430dbbfbc34fd706d0;origin=https://inria.halpreprod.archives-ouvertes.fr/hal-01243573;visit=swh:1:snp:0175049fc45055a3824a1675ac06e3711619a55a;anchor=swh:1:rev:b5f505b005435fa5c4fa4c279792bd7b17167c04;path=/", # noqa MetadataTargetType.DIRECTORY, ), ( "swh:1:rev:71b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.REVISION, ), ( "swh:1:rel:81b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.RELEASE, ), ( "swh:1:snp:91b5c8cc985d190b5a7ef4878128ebfdc2358f49;origin=h://g.c/o/repo", MetadataTargetType.SNAPSHOT, ), ], ) def test_deposit_metadata_swhid( swhid, target_type, authenticated_client, deposit_collection, atom_dataset, swh_storage, ): """Posting a swhid reference is stored on raw extrinsic metadata storage """ swhid_reference = parse_swhid(swhid) swhid_core = attr.evolve(swhid_reference, metadata={}) xml_data = atom_dataset["entry-data-with-swhid"].format(swhid=swhid) deposit_client = authenticated_client.deposit_client response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=xml_data, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) # Ensure the deposit is finalized deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert isinstance(swhid_core, SWHID) assert deposit.swhid == str(swhid_core) assert deposit.swhid_context == str(swhid_reference) assert deposit.complete_date == deposit.reception_date assert deposit.complete_date is not None assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url, metadata={"name": deposit_client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher page_results = swh_storage.raw_extrinsic_metadata_get( target_type, swhid_core, metadata_authority ) discovery_date = page_results.results[0].discovery_date assert len(page_results.results) == 1 assert page_results.next_page_token is None object_type, metadata_context = compute_metadata_context(swhid_reference) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=object_type, target=swhid_core, discovery_date=discovery_date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=xml_data.encode(), **metadata_context, ) ], next_page_token=None, ) assert deposit.complete_date == discovery_date @pytest.mark.parametrize( "url", ["https://gitlab.org/user/repo", "https://whatever.else/repo",] ) def test_deposit_metadata_origin( url, authenticated_client, deposit_collection, atom_dataset, swh_storage, ): """Posting a swhid reference is stored on raw extrinsic metadata storage """ xml_data = atom_dataset["entry-data-with-origin-reference"].format(url=url) deposit_client = authenticated_client.deposit_client response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=xml_data, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) # Ensure the deposit is finalized deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) # we got not swhid as input so we cannot have those assert deposit.swhid is None assert deposit.swhid_context is None assert deposit.complete_date == deposit.reception_date assert deposit.complete_date is not None assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url, metadata={"name": deposit_client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=deposit_client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.ORIGIN, url, metadata_authority ) discovery_date = page_results.results[0].discovery_date assert len(page_results.results) == 1 assert page_results.next_page_token is None assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.ORIGIN, target=url, discovery_date=discovery_date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=xml_data.encode(), ) ], next_page_token=None, ) assert deposit.complete_date == discovery_date diff --git a/swh/deposit/tests/api/test_collection_post_binary.py b/swh/deposit/tests/api/test_collection_post_binary.py index a6a6b501..b3e8cacb 100644 --- a/swh/deposit/tests/api/test_collection_post_binary.py +++ b/swh/deposit/tests/api/test_collection_post_binary.py @@ -1,337 +1,339 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests the handling of the binary content when doing a POST Col-IRI.""" from io import BytesIO import uuid -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( check_archive, create_arborescence_archive, post_archive, ) def test_post_deposit_binary_no_slug( authenticated_client, deposit_collection, sample_archive, deposit_user, mocker ): """Posting a binary deposit without slug header should generate one """ id_ = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=id_) url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = post_archive( authenticated_client, url, sample_archive, in_progress="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + id_ assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_binary_support( authenticated_client, deposit_collection, sample_archive ): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, sample_archive, HTTP_SLUG=external_id, content_type="application/octet-stream", HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_ok( authenticated_client, deposit_collection, sample_archive ): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then response_content = parse_xml(BytesIO(response.content)) assert response.status_code == status.HTTP_201_CREATED deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None response_content = parse_xml(BytesIO(response.content)) assert response_content["swh:deposit_archive"] == sample_archive["name"] assert int(response_content["swh:deposit_id"]) == deposit.id assert response_content["swh:deposit_status"] == deposit.status # deprecated tags assert response_content["atom:deposit_archive"] == sample_archive["name"] assert int(response_content["atom:deposit_id"]) == deposit.id assert response_content["atom:deposit_status"] == deposit.status - edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit.id]) + from django.urls import reverse as reverse_strict + + edit_iri = reverse_strict("edit_iri", args=[deposit_collection.name, deposit.id]) assert response._headers["location"] == ( "Location", "http://testserver" + edit_iri, ) def test_post_deposit_binary_failure_unsupported_packaging_header( authenticated_client, deposit_collection, sample_archive ): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_PACKAGING="something-unsupported", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"The packaging provided something-unsupported is not supported" in response.content ) with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_no_content_disposition_header( authenticated_client, deposit_collection, sample_archive ): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION=None, ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"CONTENT_DISPOSITION header is mandatory" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_mediation_not_supported( authenticated_client, deposit_collection, sample_archive ): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", HTTP_ON_BEHALF_OF="someone", ) # then assert response.status_code == status.HTTP_412_PRECONDITION_FAILED with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Binary upload must not exceed the limit set up... """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file", up_to_size=500 ) external_id = "some-external-id" # when response = post_archive( authenticated_client, url, archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b"Upload size limit exceeded" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_content_length_missing( authenticated_client, deposit_collection, sample_archive, tmp_path ): """The Content-Length header is mandatory """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file", up_to_size=500 ) external_id = "some-external-id" # when response = post_archive( authenticated_client, url, archive, CONTENT_LENGTH=None, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"the CONTENT_LENGTH header must be sent." in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_2_post_2_different_deposits( authenticated_client, deposit_collection, sample_archive ): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG="some-external-id-1", HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() assert len(deposits) == 1 assert deposits[0] == deposit # second post response = post_archive( authenticated_client, url, sample_archive, content_type="application/x-tar", HTTP_SLUG="another-external-id", HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content["swh:deposit_id"] deposit2 = Deposit.objects.get(pk=deposit_id2) assert deposit != deposit2 deposits = Deposit.objects.all().order_by("id") assert len(deposits) == 2 assert list(deposits), [deposit == deposit2] diff --git a/swh/deposit/tests/api/test_collection_post_multipart.py b/swh/deposit/tests/api/test_collection_post_multipart.py index 229d66f6..18e6dd61 100644 --- a/swh/deposit/tests/api/test_collection_post_multipart.py +++ b/swh/deposit/tests/api/test_collection_post_multipart.py @@ -1,386 +1,386 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests handling of multipart requests to POST Col-IRI.""" from io import BytesIO import uuid from django.core.files.uploadedfile import InMemoryUploadedFile -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import check_archive, post_multipart def test_post_deposit_multipart( authenticated_client, deposit_collection, atom_dataset, mocker, deposit_user, sample_archive, ): # given external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry = atom_dataset["entry-data0"] % origin_url # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry, HTTP_IN_PROGRESS="false", ) print(response.content.decode()) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_multipart_without_origin_url( authenticated_client, deposit_collection, atom_dataset, mocker, deposit_user, sample_archive, ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry = atom_dataset["entry-data-deposit-binary"] id_ = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=id_) # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + id_ assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_multipart_zip( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """one multipart deposit (zip+xml) should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry = atom_dataset["entry-data-deposit-binary"] external_id = "external-id" # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry def test_post_deposit_multipart_tar( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """one multipart deposit (tar+xml) should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry = atom_dataset["entry-data-deposit-binary"] external_id = "external-id" # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry def test_post_deposit_multipart_put_to_replace_metadata( authenticated_client, deposit_collection, atom_dataset, sample_archive ): """One multipart deposit followed by a metadata update should be accepted """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry = atom_dataset["entry-data-deposit-binary"] external_id = "external-id" # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry, HTTP_IN_PROGRESS="true", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert deposit_request.raw_metadata == data_atom_entry replace_metadata_uri = response._headers["location"][1] response = authenticated_client.put( replace_metadata_uri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT # deposit_id did not change deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 2 for deposit_request in deposit_requests: assert deposit_request.deposit == deposit if deposit_request.type == "archive": check_archive(sample_archive["name"], deposit_request.archive.name) else: assert ( deposit_request.metadata["atom:id"] == "urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a" ) assert ( deposit_request.raw_metadata == atom_dataset["entry-data-deposit-binary"] ) # FAILURE scenarios def test_post_deposit_multipart_only_archive_and_atom_entry( authenticated_client, deposit_collection ): """Multipart deposit only accepts one archive and one atom+xml""" # given url = reverse(COL_IRI, args=[deposit_collection.name]) archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/x-tar", size=len(archive_content), charset=None, ) other_archive_content = b"some-other-content" other_archive = InMemoryUploadedFile( BytesIO(other_archive_content), field_name="atom0", name="atom0", content_type="application/x-tar", size=len(other_archive_content), charset="utf-8", ) # when response = authenticated_client.post( url, format="multipart", data={"archive": archive, "atom_entry": other_archive,}, # + headers HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE assert ( "Only 1 application/zip (or application/x-tar) archive" in response.content.decode("utf-8") ) # when archive.seek(0) response = authenticated_client.post( url, format="multipart", data={"archive": archive,}, # + headers HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE assert ( "You must provide both 1 application/zip (or " "application/x-tar) and 1 atom+xml entry for " "multipart deposit" in response.content.decode("utf-8") ) is True def test_post_deposit_multipart_400_when_badly_formatted_xml( authenticated_client, deposit_collection, sample_archive, atom_dataset ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) data_atom_entry_ko = atom_dataset["entry-data-ko"] # when response = post_multipart( authenticated_client, url, sample_archive, data_atom_entry_ko, HTTP_IN_PROGRESS="false", HTTP_SLUG="external-id", ) assert b"Malformed xml metadata" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_multipart_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, atom_dataset, sample_archive ): # given url = reverse(COL_IRI, args=[deposit_collection.name]) archive = { **sample_archive, "data": sample_archive["data"] * 8, } data_atom_entry = atom_dataset["entry-data-deposit-binary"] external_id = "external-id" # when response = post_multipart( authenticated_client, url, archive, data_atom_entry, HTTP_IN_PROGRESS="false", HTTP_SLUG=external_id, ) # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b"Upload size limit exceeded" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) diff --git a/swh/deposit/tests/api/test_collection_reuse_slug.py b/swh/deposit/tests/api/test_collection_reuse_slug.py index 8968dd37..31c0d3b1 100644 --- a/swh/deposit/tests/api/test_collection_reuse_slug.py +++ b/swh/deposit/tests/api/test_collection_reuse_slug.py @@ -1,287 +1,287 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, SE_IRI, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import post_atom from ..conftest import create_deposit def test_act_on_deposit_rejected_is_not_permitted( authenticated_client, deposit_collection, rejected_deposit, atom_dataset ): deposit = rejected_deposit response = post_atom( authenticated_client, reverse(SE_IRI, args=[deposit.collection.name, deposit.id]), data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST msg = "You can only act on deposit with status '%s'" % ( DEPOSIT_STATUS_PARTIAL, ) assert msg in response.content.decode("utf-8") def test_add_deposit_when_partial_makes_new_deposit( authenticated_client, deposit_collection, partial_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when previous is partial makes new deposit """ deposit = partial_deposit assert deposit.status == DEPOSIT_STATUS_PARTIAL origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id # new deposit new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_when_failed_makes_new_deposit_with_no_parent( authenticated_client, deposit_collection, failed_deposit, atom_dataset, deposit_user ): """Posting deposit on collection when deposit done makes new deposit with parent """ deposit = failed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_when_done_makes_new_deposit_with_parent_old_one( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when deposit done makes new deposit with parent """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url def test_add_deposit_with_external_identifier( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Even though is deprecated, it should still be allowed when it matches the slug, so that we don't break existing clients """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["error-with-external-identifier"] % deposit.external_id, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url def test_add_deposit_external_id_conflict_no_parent( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, atom_dataset, sample_archive, deposit_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client does not create a parent relationship """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id # create a deposit for that other user, with the same slug other_deposit = create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert other_deposit.id != deposit_id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_external_id_conflict_with_parent( authenticated_client, another_authenticated_client, deposit_collection, deposit_another_collection, completed_deposit, atom_dataset, sample_archive, deposit_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client creates a parent relationship with the deposit of the right client instead of the last matching deposit This test does not have an equivalent for origin url conflicts, as these can not happen (assuming clients do not have provider_url overlaps) """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # create a deposit for that other user, with the same slug other_deposit = create_deposit( another_authenticated_client, deposit_another_collection.name, sample_archive, deposit.external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id assert other_deposit.id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.external_id == new_deposit.external_id assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url diff --git a/swh/deposit/tests/api/test_delete.py b/swh/deposit/tests/api/test_delete.py index 1a1dcf76..80479df1 100644 --- a/swh/deposit/tests/api/test_delete.py +++ b/swh/deposit/tests/api/test_delete.py @@ -1,131 +1,131 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from typing import Dict, Mapping -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import ( ARCHIVE_KEY, DEPOSIT_STATUS_DEPOSITED, EDIT_IRI, EM_IRI, METADATA_KEY, ) from swh.deposit.models import Deposit, DepositRequest def count_deposit_request_types(deposit_requests) -> Mapping[str, int]: deposit_request_types = defaultdict(int) # type: Dict[str, int] for dr in deposit_requests: deposit_request_types[dr.type] += 1 return deposit_request_types def test_delete_archive_on_partial_deposit_works( authenticated_client, partial_deposit_with_metadata, deposit_collection ): """Removing partial deposit's archive should return a 204 response """ deposit_id = partial_deposit_with_metadata.id deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) # deposit request type: 'archive', 1 'metadata' deposit_request_types = count_deposit_request_types(deposit_requests) assert deposit_request_types == {ARCHIVE_KEY: 1, METADATA_KEY: 1} # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit_id) deposit_requests2 = DepositRequest.objects.filter(deposit=deposit) deposit_request_types = count_deposit_request_types(deposit_requests2) assert deposit_request_types == {METADATA_KEY: 1} def test_delete_archive_on_undefined_deposit_fails( authenticated_client, deposit_collection, sample_archive ): """Delete undefined deposit returns a 404 response """ # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, 999]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_404_NOT_FOUND def test_delete_non_partial_deposit( authenticated_client, deposit_collection, deposited_deposit ): """Delete !partial status deposit should return a 400 response """ deposit = deposited_deposit assert deposit.status == DEPOSIT_STATUS_DEPOSITED # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"You can only act on deposit with status 'partial'" in response.content ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None def test_delete_partial_deposit( authenticated_client, deposit_collection, partial_deposit ): """Delete deposit should return a 204 response """ # given deposit = partial_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) assert deposit_requests == [] deposits = list(Deposit.objects.filter(pk=deposit.id)) assert deposits == [] def test_delete_on_edit_iri_cannot_delete_non_partial_deposit( authenticated_client, deposit_collection, complete_deposit ): """Delete !partial deposit should return a 400 response """ # given deposit = complete_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"You can only act on deposit with status 'partial'" in response.content ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py index 3de52950..013a6493 100644 --- a/swh/deposit/tests/api/test_deposit_list.py +++ b/swh/deposit/tests/api/test_deposit_list.py @@ -1,100 +1,100 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.api.converters import convert_status_detail from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS, ) STATUS_DETAIL = { "url": { "summary": "At least one compatible url field. Failed", "fields": ["testurl"], }, "metadata": [{"summary": "Mandatory fields missing", "fields": ["9", 10, 1.212],},], "archive": [ {"summary": "Invalid archive", "fields": ["3"],}, {"summary": "Unsupported archive", "fields": [2],}, ], } def test_deposit_list(partial_deposit, deposited_deposit, authenticated_client): """Deposit list api should return all deposits in a paginated way """ partial_deposit.status_detail = STATUS_DETAIL partial_deposit.save() deposit_id = partial_deposit.id deposit_id2 = deposited_deposit.id # NOTE: does not work as documented # https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa # url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1}) main_url = reverse(PRIVATE_LIST_DEPOSITS) url = "%s?page_size=1" % main_url response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["count"] == 2 # 2 deposits expected_next = f"{main_url}?page=2&page_size=1" assert data["next"].endswith(expected_next) is True assert data["previous"] is None assert len(data["results"]) == 1 # page of size 1 deposit = data["results"][0] assert deposit["id"] == deposit_id assert deposit["status"] == DEPOSIT_STATUS_PARTIAL expected_status_detail = convert_status_detail(STATUS_DETAIL) assert deposit["status_detail"] == expected_status_detail # then 2nd page response2 = authenticated_client.get(expected_next) assert response2.status_code == status.HTTP_200_OK data2 = response2.json() assert data2["count"] == 2 # still 2 deposits assert data2["next"] is None expected_previous = f"{main_url}?page_size=1" assert data2["previous"].endswith(expected_previous) is True assert len(data2["results"]) == 1 # page of size 1 deposit2 = data2["results"][0] assert deposit2["id"] == deposit_id2 assert deposit2["status"] == DEPOSIT_STATUS_DEPOSITED def test_deposit_list_exclude(partial_deposit, deposited_deposit, authenticated_client): """Exclusion pattern on external_id should be respected """ partial_deposit.status_detail = STATUS_DETAIL partial_deposit.save() main_url = reverse(PRIVATE_LIST_DEPOSITS) # Testing exclusion pattern exclude_pattern = "external-id" assert partial_deposit.external_id.startswith(exclude_pattern) assert deposited_deposit.external_id.startswith(exclude_pattern) url = f"{main_url}?page_size=1&exclude=external-id" response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["count"] == 0 url = "%s?page_size=1&exclude=dummy" % main_url # that won't exclude anything response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["count"] == 2 diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py index b89868c5..5fe2d3b5 100644 --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -1,211 +1,211 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.api.checks import MANDATORY_FIELDS_MISSING from swh.deposit.api.private.deposit_check import ( MANDATORY_ARCHIVE_INVALID, MANDATORY_ARCHIVE_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ) from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, create_archive_with_archive, ) PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + "-nc" def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]), ] @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_ok( authenticated_client, deposit_collection, ready_deposit_ok, extension ): """Proper deposit should succeed the checks (-> status ready) """ deposit = ready_deposit_ok for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_invalid_tarball( tmp_path, authenticated_client, deposit_collection, extension ): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ deposit = create_deposit_archive_with_archive( tmp_path, extension, authenticated_client, deposit_collection.name ) for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_INVALID deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED def test_deposit_ko_missing_tarball( authenticated_client, deposit_collection, ready_deposit_only_metadata ): """Deposit without archive should fail the checks: rejected """ deposit = ready_deposit_only_metadata assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_ko_unsupported_tarball( tmp_path, authenticated_client, deposit_collection, ready_deposit_invalid_archive ): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit = ready_deposit_invalid_archive assert DEPOSIT_STATUS_DEPOSITED == deposit.status for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure assert len(details["metadata"]) == 1 mandatory = details["metadata"][0] assert mandatory["summary"] == MANDATORY_FIELDS_MISSING assert set(mandatory["fields"]) == set( [ "atom:author or codemeta:author", "atom:name or atom:title or codemeta:name", ] ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_deposit_metadata_ok( authenticated_client, deposit_collection, ready_deposit_ok ): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit = ready_deposit_ok assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def create_deposit_archive_with_archive( root_path, archive_extension, client, collection_name ): # we create the holding archive to a given extension archive = create_arborescence_archive( root_path, "archive1", "file1", b"some content in file", extension=archive_extension, ) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive(root_path, "invalid.tgz", archive) # we deposit it response = client.post( reverse(COL_IRI, args=[collection_name]), content_type="application/x-tar", data=invalid_archive["data"], CONTENT_LENGTH=invalid_archive["length"], HTTP_MD5SUM=invalid_archive["md5sum"], HTTP_SLUG="external-id", HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (invalid_archive["name"],), ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_status = response_content["swh:deposit_status"] assert deposit_status == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert DEPOSIT_STATUS_DEPOSITED == deposit.status return deposit diff --git a/swh/deposit/tests/api/test_deposit_private_read_archive.py b/swh/deposit/tests/api/test_deposit_private_read_archive.py index 55a07733..3fd8c3ae 100644 --- a/swh/deposit/tests/api/test_deposit_private_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_private_read_archive.py @@ -1,106 +1,106 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os.path import exists, join import tarfile -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.api.private.deposit_read import aggregate_tarballs from swh.deposit.config import EM_IRI, PRIVATE_GET_RAW_CONTENT from swh.deposit.tests.common import create_arborescence_archive PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + "-nc" def private_get_raw_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_GET_RAW_CONTENT, args=[collection.name, deposit.id]), reverse(PRIVATE_GET_RAW_CONTENT_NC, args=[deposit.id]), ] def test_access_to_existing_deposit_with_one_archive( authenticated_client, deposit_collection, complete_deposit, sample_archive, tmp_path, ): """Access to deposit should stream a 200 response with its raw content """ deposit = complete_deposit for i, url in enumerate(private_get_raw_url_endpoints(deposit_collection, deposit)): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/tar" # write the response stream in a temporary archive archive_path = join(tmp_path, f"archive_{i}.tar") with open(archive_path, "wb") as f: for chunk in response.streaming_content: f.write(chunk) # to check its properties are correct tfile = tarfile.open(archive_path) assert set(tfile.getnames()) == {".", "./file1"} assert tfile.extractfile("./file1").read() == b"some content in file" def test_access_to_existing_deposit_with_multiple_archives( tmp_path, authenticated_client, deposit_collection, partial_deposit, sample_archive ): """Access to deposit should stream a 200 response with its raw contents """ deposit = partial_deposit archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # Add a second archive to deposit update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/zip", # as zip data=archive2["data"], # + headers CONTENT_LENGTH=archive2["length"], HTTP_SLUG=deposit.external_id, HTTP_CONTENT_MD5=archive2["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_201_CREATED for i, url in enumerate(private_get_raw_url_endpoints(deposit_collection, deposit)): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/tar" # write the response stream in a temporary archive archive_path = join(tmp_path, f"archive_{i}.tar") with open(archive_path, "wb") as f: for chunk in response.streaming_content: f.write(chunk) # to check its properties are correct tfile = tarfile.open(archive_path) assert set(tfile.getnames()) == {".", "./file1", "./file2"} assert tfile.extractfile("./file1").read() == b"some content in file" assert tfile.extractfile("./file2").read() == b"some other content in file" def test_aggregate_tarballs_with_strange_archive(datadir, tmp_path): archive = join(datadir, "archives", "single-artifact-package.tar.gz") with aggregate_tarballs(tmp_path, [archive]) as tarball_path: assert exists(tarball_path) diff --git a/swh/deposit/tests/api/test_deposit_private_read_metadata.py b/swh/deposit/tests/api/test_deposit_private_read_metadata.py index 5700101a..c13c5ab3 100644 --- a/swh/deposit/tests/api/test_deposit_private_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_private_read_metadata.py @@ -1,398 +1,398 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit import __version__, utils from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA, SE_IRI, SWH_PERSON from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml PRIVATE_GET_DEPOSIT_METADATA_NC = PRIVATE_GET_DEPOSIT_METADATA + "-nc" def private_get_raw_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" deposit_id = deposit if isinstance(deposit, int) else deposit.id return [ reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[collection.name, deposit_id]), reverse(PRIVATE_GET_DEPOSIT_METADATA_NC, args=[deposit_id]), ] def update_deposit_with_metadata(authenticated_client, collection, deposit, metadata): # update deposit's metadata response = authenticated_client.post( reverse(SE_IRI, args=[collection.name, deposit.id]), content_type="application/atom+xml;type=entry", data=metadata, HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED return deposit def test_read_metadata( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Private metadata read api to existing deposit should return metadata """ deposit = partial_deposit deposit.external_id = "some-external-id" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_atoms = [ atom_dataset[atom_key] for atom_key in ["entry-data2", "entry-data3"] ] metadata_xml_raws = [parse_xml(xml) for xml in metadata_xml_atoms] for atom_xml in metadata_xml_atoms: deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, atom_xml, ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/some-external-id", }, "metadata_raw": metadata_xml_atoms, "metadata_dict": utils.merge(*metadata_xml_raws), "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "author_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": "test", "id": deposit.id, "collection": "test", "revision_parents": [], }, } def test_read_metadata_revision_with_parent( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Private read metadata to a deposit (with parent) returns metadata """ deposit = partial_deposit deposit.external_id = "some-external-id" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_atoms = [ atom_dataset[atom_key] for atom_key in ["entry-data2", "entry-data3"] ] metadata_xml_raws = [parse_xml(xml) for xml in metadata_xml_atoms] for atom_xml in metadata_xml_atoms: deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, atom_xml, ) rev_id = "da78a9d4cf1d5d29873693fd496142e3a18c20fa" swhid = "swh:1:rev:%s" % rev_id fake_parent = Deposit( swhid=swhid, client=deposit.client, collection=deposit.collection ) fake_parent.save() deposit.parent = fake_parent deposit.save() for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/some-external-id", }, "metadata_raw": metadata_xml_atoms, "metadata_dict": utils.merge(*metadata_xml_raws), "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "author_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": "test", "id": deposit.id, "collection": "test", "revision_parents": [rev_id], }, } def test_read_metadata_3( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """date(Created|Published) provided, uses author/committer date """ deposit = partial_deposit deposit.external_id = "hal-01243065" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() # add metadata to the deposit with datePublished and dateCreated codemeta_entry_data = ( atom_dataset["metadata"] % """ 2015-04-06T17:08:47+02:00 2017-05-03T16:08:47+02:00 """ ) metadata_xml_atoms = [ atom_dataset["entry-data2"], atom_dataset["entry-data3"], codemeta_entry_data, ] metadata_xml_raws = [parse_xml(xml) for xml in metadata_xml_atoms] for atom_xml in metadata_xml_atoms: update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, atom_xml, ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/hal-01243065", }, "metadata_raw": metadata_xml_atoms, "metadata_dict": utils.merge(*metadata_xml_raws), "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "negative_utc": False, "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1493820527}, }, "author_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], }, } def test_read_metadata_4( authenticated_client, deposit_collection, atom_dataset, partial_deposit ): """dateCreated/datePublished not provided, revision uses complete_date """ deposit = partial_deposit codemeta_entry_data = atom_dataset["metadata"] % "" deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, codemeta_entry_data ) # will use the deposit completed date as fallback date deposit.complete_date = "2016-04-06" deposit.save() for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/json" actual_data = response.json() assert actual_data == { "origin": {"type": "deposit", "url": None,}, "metadata_raw": [codemeta_entry_data], "metadata_dict": parse_xml(codemeta_entry_data), "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1459900800}, }, "author_date": { "negative_utc": False, "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1459900800}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], }, } def test_read_metadata_5( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """dateCreated/datePublished provided, revision uses author/committer date If multiple dateCreated provided, the first occurrence (of dateCreated) is selected. If multiple datePublished provided, the first occurrence (of datePublished) is selected. """ deposit = partial_deposit # add metadata to the deposit with multiple datePublished/dateCreated codemeta_entry_data = ( atom_dataset["metadata"] % """ 2015-04-06T17:08:47+02:00 2017-05-03T16:08:47+02:00 2016-04-06T17:08:47+02:00 2018-05-03T16:08:47+02:00 """ ) deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, codemeta_entry_data ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response._headers["content-type"][1] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/hal-01243065", }, "metadata_raw": [codemeta_entry_data], "metadata_dict": parse_xml(codemeta_entry_data), "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "negative_utc": False, "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1493820527}, }, "author_date": { "negative_utc": False, "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1428332927}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], }, } def test_access_to_nonexisting_deposit_returns_404_response( authenticated_client, deposit_collection, ): """Read unknown collection should return a 404 response """ unknown_id = 999 try: Deposit.objects.get(pk=unknown_id) except Deposit.DoesNotExist: assert True for url in private_get_raw_url_endpoints(deposit_collection, unknown_id): response = authenticated_client.get(url) assert response.status_code == status.HTTP_404_NOT_FOUND msg = "Deposit %s does not exist" % unknown_id assert msg in response.content.decode("utf-8") diff --git a/swh/deposit/tests/api/test_deposit_private_update_status.py b/swh/deposit/tests/api/test_deposit_private_update_status.py index ec34465c..a47ced46 100644 --- a/swh/deposit/tests/api/test_deposit_private_update_status.py +++ b/swh/deposit/tests/api/test_deposit_private_update_status.py @@ -1,196 +1,196 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import json -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.api.private.deposit_update_status import MANDATORY_KEYS from swh.deposit.config import ( DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, PRIVATE_PUT_DEPOSIT, ) from swh.deposit.models import Deposit from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT, swhid PRIVATE_PUT_DEPOSIT_NC = PRIVATE_PUT_DEPOSIT + "-nc" def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_PUT_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_PUT_DEPOSIT_NC, args=[deposit.id]), ] def test_update_deposit_status_success_with_info( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit with load success should require all information to succeed """ deposit = ready_deposit_verified expected_status = DEPOSIT_STATUS_LOAD_SUCCESS origin_url = "something" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "47dc6b4636c7f6cba0df83e3d5490bf4334d987e" snapshot_id = "68c0d26104d47e278dd6be07ed61fafb561d0d20" full_body_info = { "status": DEPOSIT_STATUS_LOAD_SUCCESS, "revision_id": revision_id, "directory_id": directory_id, "snapshot_id": snapshot_id, "origin_url": origin_url, } for url in private_check_url_endpoints(deposit_collection, deposit): dir_id = swhid(DIRECTORY, directory_id) rev_id = swhid(REVISION, revision_id) snp_id = swhid(SNAPSHOT, snapshot_id) expected_swhid = "swh:1:dir:%s" % directory_id expected_swhid_context = ( f"{dir_id};origin={origin_url};" + f"visit={snp_id};anchor={rev_id};path=/" ) response = authenticated_client.put( url, content_type="application/json", data=json.dumps(full_body_info), ) assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == expected_status assert deposit.swhid == expected_swhid assert deposit.swhid_context == expected_swhid_context # Reset deposit deposit = ready_deposit_verified deposit.save() def test_update_deposit_status_rejected_with_info( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit with rejected status needs few information to succeed """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": DEPOSIT_STATUS_LOAD_FAILURE}), ) assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE assert deposit.swhid is None assert deposit.swhid_context is None # Reset status deposit = ready_deposit_verified deposit.save() def test_update_deposit_status_success_with_incomplete_data( authenticated_client, deposit_collection, ready_deposit_verified ): """Update deposit status with status success and incomplete information should fail """ deposit = ready_deposit_verified origin_url = "something" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "47dc6b4636c7f6cba0df83e3d5490bf4334d987e" snapshot_id = "68c0d26104d47e278dd6be07ed61fafb561d0d20" new_status = DEPOSIT_STATUS_LOAD_SUCCESS full_body_info = { "status": new_status, "revision_id": revision_id, "directory_id": directory_id, "snapshot_id": snapshot_id, "origin_url": origin_url, } for url in private_check_url_endpoints(deposit_collection, deposit): for key in MANDATORY_KEYS: # Crafting body with missing information so that it raises body = copy.deepcopy(full_body_info) body.pop(key) # make the body incomplete response = authenticated_client.put( url, content_type="application/json", data=json.dumps(body), ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( f"deposit status to {new_status} requires information {key}" in response.content.decode("utf-8") ) def test_update_deposit_status_will_fail_with_unknown_status( authenticated_client, deposit_collection, ready_deposit_verified ): """Unknown status for update should return a 400 response """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": "unknown"}) ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Possible status in " in response.content def test_update_deposit_status_will_fail_with_no_status_key( authenticated_client, deposit_collection, ready_deposit_verified ): """No status provided for update should return a 400 response """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"something": "something"}), ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"The status key is mandatory with possible values" in response.content def test_update_deposit_status_success_without_swhid_fail( authenticated_client, deposit_collection, ready_deposit_verified ): """Providing successful status without swhid should return a 400 """ deposit = ready_deposit_verified for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.put( url, content_type="application/json", data=json.dumps({"status": DEPOSIT_STATUS_LOAD_SUCCESS}), ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"Updating deposit status to done requires information" in response.content ) diff --git a/swh/deposit/tests/api/test_deposit_schedule.py b/swh/deposit/tests/api/test_deposit_schedule.py index 9270c9f5..fd3a2493 100644 --- a/swh/deposit/tests/api/test_deposit_schedule.py +++ b/swh/deposit/tests/api/test_deposit_schedule.py @@ -1,133 +1,133 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime from io import BytesIO -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, SE_IRI, ) from swh.deposit.parsers import parse_xml @pytest.fixture() def deposit_config(deposit_config): """Overrides the `deposit_config` fixture define in swh/deposit/tests/conftest.py to re-enable the checks.""" config_d = copy.deepcopy(deposit_config) config_d["checks"] = True return config_d def now() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) def assert_task_for_deposit( swh_scheduler, deposit_id, timestamp_before_call, timestamp_after_call ): tasks = swh_scheduler.grab_ready_tasks("check-deposit") assert len(tasks) == 1 task = tasks[0] assert timestamp_before_call <= task.pop("next_run") <= timestamp_after_call assert task["arguments"] == { "args": [], "kwargs": {"collection": "test", "deposit_id": deposit_id,}, } assert task["policy"] == "oneshot" assert task["type"] == "check-deposit" assert task["retries_left"] == 3 def test_add_deposit_schedules_check( authenticated_client, deposit_collection, sample_archive, swh_scheduler ): """Posting deposit by POST Col-IRI creates a checker task """ tasks = swh_scheduler.grab_ready_tasks("check-deposit") assert len(tasks) == 0 external_id = "external-id-schedules-check" url = reverse(COL_IRI, args=[deposit_collection.name]) timestamp_before_call = now() response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"]), ) timestamp_after_call = now() assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) actual_state = response_content["swh:deposit_status"] assert actual_state == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content["swh:deposit_id"]) assert_task_for_deposit( swh_scheduler, deposit_id, timestamp_before_call, timestamp_after_call ) def test_update_deposit_schedules_check( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, swh_scheduler, ): """Updating deposit by POST SE-IRI creates a checker task """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL tasks = swh_scheduler.grab_ready_tasks("check-deposit") assert len(tasks) == 0 update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) timestamp_before_call = now() response = authenticated_client.post( update_uri, content_type="application/atom+xml;type=entry", data="", size=0, HTTP_IN_PROGRESS=False, ) timestamp_after_call = now() assert response.status_code == status.HTTP_200_OK response_content = parse_xml(BytesIO(response.content)) actual_state = response_content["swh:deposit_status"] assert actual_state == DEPOSIT_STATUS_DEPOSITED assert deposit.id == int(response_content["swh:deposit_id"]) assert_task_for_deposit( swh_scheduler, deposit.id, timestamp_before_call, timestamp_after_call ) diff --git a/swh/deposit/tests/api/test_deposit_state.py b/swh/deposit/tests/api/test_deposit_state.py index 6688cca9..856fd9d3 100644 --- a/swh/deposit/tests/api/test_deposit_state.py +++ b/swh/deposit/tests/api/test_deposit_state.py @@ -1,127 +1,127 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, STATE_IRI, ) from swh.deposit.models import DEPOSIT_STATUS_DETAIL, DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.parsers import parse_xml def test_post_deposit_with_status_check(authenticated_client, deposited_deposit): """Successful but not loaded deposit should have a status 'deposited' """ deposit = deposited_deposit status_url = reverse(STATE_IRI, args=[deposit.collection.name, deposit.id]) # check status status_response = authenticated_client.get(status_url) assert status_response.status_code == status.HTTP_200_OK r = parse_xml(BytesIO(status_response.content)) assert int(r["swh:deposit_id"]) == deposit.id assert r["swh:deposit_status"] == DEPOSIT_STATUS_DEPOSITED assert ( r["swh:deposit_status_detail"] == DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_DEPOSITED] ) assert r["swh:deposit_external_id"] == deposit.external_id assert r["swh:deposit_origin_url"] == deposit.origin_url def test_status_unknown_deposit(authenticated_client, deposit_collection): """Unknown deposit status should return 404 response """ unknown_deposit_id = 999 status_url = reverse(STATE_IRI, args=[deposit_collection.name, unknown_deposit_id]) status_response = authenticated_client.get(status_url) assert status_response.status_code == status.HTTP_404_NOT_FOUND def test_status_unknown_collection(authenticated_client, deposited_deposit): """Unknown collection status should return 404 response""" deposit = deposited_deposit unknown_collection = "something-unknown" status_url = reverse(STATE_IRI, args=[unknown_collection, deposit.id]) status_response = authenticated_client.get(status_url) assert status_response.status_code == status.HTTP_404_NOT_FOUND def test_status_deposit_rejected(authenticated_client, rejected_deposit): """Rejected deposit status should be 'rejected' with detailed summary """ deposit = rejected_deposit # _status_detail = {'url': {'summary': 'Wrong url'}} url = reverse(STATE_IRI, args=[deposit.collection.name, deposit.id]) # when status_response = authenticated_client.get(url) # then assert status_response.status_code == status.HTTP_200_OK r = parse_xml(BytesIO(status_response.content)) assert int(r["swh:deposit_id"]) == deposit.id assert r["swh:deposit_status"] == DEPOSIT_STATUS_REJECTED assert r["swh:deposit_status_detail"] == "Deposit failed the checks" if deposit.swhid: assert r["swh:deposit_swhid"] == deposit.swhid def test_status_with_http_accept_header_should_not_break( authenticated_client, partial_deposit ): """Asking deposit status with Accept header should return 200 """ deposit = partial_deposit status_url = reverse(STATE_IRI, args=[deposit.collection.name, deposit.id]) response = authenticated_client.get(status_url) assert response.status_code == status.HTTP_200_OK response = authenticated_client.get( status_url, HTTP_ACCEPT="text/html,application/xml;q=9,*/*,q=8" ) assert response.status_code == status.HTTP_200_OK def test_status_complete_deposit(authenticated_client, complete_deposit): """Successful and loaded deposit should be 'done' and have detailed swh ids """ deposit = complete_deposit url = reverse(STATE_IRI, args=[deposit.collection.name, deposit.id]) # when status_response = authenticated_client.get(url) # then assert status_response.status_code == status.HTTP_200_OK r = parse_xml(BytesIO(status_response.content)) assert int(r["swh:deposit_id"]) == deposit.id assert r["swh:deposit_status"] == DEPOSIT_STATUS_LOAD_SUCCESS assert ( r["swh:deposit_status_detail"] == DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_LOAD_SUCCESS] ) assert deposit.swhid is not None assert r["swh:deposit_swh_id"] == deposit.swhid assert deposit.swhid_context is not None assert r["swh:deposit_swh_id_context"] == deposit.swhid_context assert r["swh:deposit_origin_url"] == deposit.origin_url diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index ec83a3f8..828c9d8a 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,140 +1,140 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on SE-IRI.""" -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_PARTIAL, EDIT_IRI, SE_IRI, ) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.tests.common import post_atom, post_multipart, put_multipart def test_add_both_archive_and_metadata_to_deposit( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, sample_archive, deposit_user, ): """Scenario: Add both a new archive and new metadata to a partial deposit is ok Response: 201 """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 data_atom_entry = atom_dataset["entry-data1"] response = post_multipart( authenticated_client, reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), sample_archive, data_atom_entry, ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 1 + 1, "New deposit request archive got added" expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == data_atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 + 1, "New deposit request metadata got added" def test_post_metadata_empty_post_finalize_deposit_ok( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, ): """Empty atom post entry with header in-progress to false transitions deposit to 'deposited' status Response: 200 """ deposit = partial_deposit_with_metadata assert deposit.status == DEPOSIT_STATUS_PARTIAL update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) response = post_atom( authenticated_client, update_uri, data="", size=0, HTTP_IN_PROGRESS=False, ) assert response.status_code == status.HTTP_200_OK deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_put_update_metadata_and_archive_deposit_partial_nominal( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, sample_archive, deposit_user, ): """Scenario: Replace metadata and archive(s) with new ones should be ok Response: 204 """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 data_atom_entry = atom_dataset["entry-data1"] response = put_multipart( authenticated_client, reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]), sample_archive, data_atom_entry, ) assert response.status_code == status.HTTP_204_NO_CONTENT # check we updated the metadata part requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == data_atom_entry assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # and the archive part requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) != set(requests_archive1) diff --git a/swh/deposit/tests/api/test_deposit_update_atom.py b/swh/deposit/tests/api/test_deposit_update_atom.py index b903f0ae..81004ae0 100644 --- a/swh/deposit/tests/api/test_deposit_update_atom.py +++ b/swh/deposit/tests/api/test_deposit_update_atom.py @@ -1,608 +1,608 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import attr -from django.urls import reverse +from django.urls import reverse_lazy as reverse import pytest from rest_framework import status from swh.deposit.api.common import ACCEPT_ARCHIVE_CONTENT_TYPES from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, EDIT_IRI, EM_IRI, SE_IRI, APIConfig, ) from swh.deposit.models import Deposit, DepositCollection, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import post_atom, put_atom from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import parse_swhid, swhid from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, ) from swh.storage.interface import PagedResult def test_post_deposit_atom_entry_multiple_steps( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """After initial deposit, updating a deposit should return a 201 """ # given origin_url = deposit_user.provider_url + "2225c695-cfb8-4ebb-aaaa-80da344efa6a" with pytest.raises(Deposit.DoesNotExist): deposit = Deposit.objects.get(origin_url=origin_url) # when response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data1"], HTTP_IN_PROGRESS="True", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url is None # not provided yet assert deposit.status == "partial" # one associated request to a deposit deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert len(deposit_requests) == 1 atom_entry_data = atom_dataset["entry-only-create-origin"] % (origin_url) for link in response_content["atom:link"]: if link["@rel"] == "http://purl.org/net/sword/terms/add": se_iri = link["@href"] break else: assert False, f"missing SE-IRI from {response_content['link']}" # when updating the first deposit post response = post_atom( authenticated_client, se_iri, data=atom_entry_data, HTTP_IN_PROGRESS="False", ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == origin_url assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert len(Deposit.objects.all()) == 1 # now 2 associated requests to a same deposit deposit_requests = DepositRequest.objects.filter(deposit=deposit).order_by("id") assert len(deposit_requests) == 2 atom_entry_data1 = atom_dataset["entry-data1"] expected_meta = [ {"metadata": parse_xml(atom_entry_data1), "raw_metadata": atom_entry_data1}, {"metadata": parse_xml(atom_entry_data), "raw_metadata": atom_entry_data}, ] for i, deposit_request in enumerate(deposit_requests): actual_metadata = deposit_request.metadata assert actual_metadata == expected_meta[i]["metadata"] assert deposit_request.raw_metadata == expected_meta[i]["raw_metadata"] assert bool(deposit_request.archive) is False def test_replace_metadata_to_deposit_is_possible( tmp_path, authenticated_client, partial_deposit_with_metadata, deposit_collection, atom_dataset, deposit_user, ): """Replace all metadata with another one should return a 204 response """ # given deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id raw_metadata0 = atom_dataset["entry-data0"] % origin_url requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta0 = requests_meta[0] assert request_meta0.raw_metadata == raw_metadata0 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = put_atom( authenticated_client, update_uri, data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_204_NO_CONTENT requests_meta = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta) == 1 request_meta1 = requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] assert raw_metadata0 != raw_metadata1 assert request_meta0 != request_meta1 # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_metadata_to_deposit_is_possible( authenticated_client, deposit_collection, partial_deposit_with_metadata, atom_dataset, deposit_user, ): """Add metadata with another one should return a 204 response """ deposit = partial_deposit_with_metadata origin_url = deposit_user.provider_url + deposit.external_id requests = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests) == 1 requests_archive0 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive0) == 1 update_uri = reverse(SE_IRI, args=[deposit_collection.name, deposit.id]) atom_entry = atom_dataset["entry-data1"] response = post_atom(authenticated_client, update_uri, data=atom_entry) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="metadata").order_by( "id" ) assert len(requests) == 2 expected_raw_meta0 = atom_dataset["entry-data0"] % origin_url # a new one was added assert requests[0].raw_metadata == expected_raw_meta0 assert requests[1].raw_metadata == atom_entry # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests_archive1) == 1 assert set(requests_archive0) == set(requests_archive1) def test_add_metadata_to_unknown_deposit( deposit_collection, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 1000 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(SE_IRI, args=[deposit_collection, unknown_deposit_id]) response = post_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit 1000 does not exist" in response_content["sword:error"]["atom:summary"] ) def test_add_metadata_to_unknown_collection( partial_deposit, authenticated_client, atom_dataset ): """Replacing metadata to unknown deposit should return a 404 response """ deposit = partial_deposit unknown_collection_name = "unknown-collection" try: DepositCollection.objects.get(name=unknown_collection_name) except DepositCollection.DoesNotExist: assert True url = reverse(SE_IRI, args=[unknown_collection_name, deposit.id]) response = post_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert "Unknown collection name" in response_content["sword:error"]["atom:summary"] def test_replace_metadata_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 998 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EDIT_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = put_atom(authenticated_client, url, data=atom_dataset["entry-data1"],) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_post_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (POST) archive with wrong content type should return 400 """ deposit = partial_deposit update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.post( update_uri, content_type="application/x-gtar-compressed", data=atom_dataset["entry-data1"], ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_metadata_to_em_iri_failure( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Update (PUT) archive with wrong content type should return 400 """ # given deposit = partial_deposit # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = put_atom( authenticated_client, update_uri, data=atom_dataset["entry-data1"], ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Packaging format supported is restricted" in response.content for supported_format in ACCEPT_ARCHIVE_CONTENT_TYPES: assert supported_format.encode() in response.content def test_put_update_metadata_done_deposit_nominal( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, sample_data, swh_storage, ): """Nominal scenario, client send an update of metadata on a deposit with status "done" with an existing swhid. Such swhid has its metadata updated accordingly both in the deposit backend and in the metadata storage. Response: 204 """ deposit_swhid = parse_swhid(complete_deposit.swhid) assert deposit_swhid.object_type == "directory" directory_id = hash_to_bytes(deposit_swhid.object_id) # directory targeted by the complete_deposit does not exist in the storage assert list(swh_storage.directory_missing([directory_id])) == [directory_id] # so let's create a directory reference in the storage (current deposit targets an # unknown swhid) existing_directory = sample_data.directory swh_storage.directory_add([existing_directory]) assert list(swh_storage.directory_missing([existing_directory.id])) == [] # and patch one complete deposit swhid so it targets said reference complete_deposit.swhid = swhid("directory", existing_directory.id) complete_deposit.save() actual_existing_requests_archive = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) nb_archives = len(actual_existing_requests_archive) actual_existing_requests_metadata = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) nb_metadata = len(actual_existing_requests_metadata) update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = put_atom( authenticated_client, update_uri, data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_204_NO_CONTENT new_requests_meta = DepositRequest.objects.filter( deposit=complete_deposit, type="metadata" ) assert len(new_requests_meta) == nb_metadata + 1 request_meta1 = new_requests_meta[0] raw_metadata1 = request_meta1.raw_metadata assert raw_metadata1 == atom_dataset["entry-data1"] # check we did not touch the other parts requests_archive1 = DepositRequest.objects.filter( deposit=complete_deposit, type="archive" ) assert len(requests_archive1) == nb_archives assert set(actual_existing_requests_archive) == set(requests_archive1) # Ensure metadata stored in the metadata storage is consistent metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url, metadata={"name": complete_deposit.client.last_name}, ) actual_authority = swh_storage.metadata_authority_get( MetadataAuthorityType.DEPOSIT_CLIENT, url=complete_deposit.client.provider_url ) assert actual_authority == metadata_authority config = APIConfig() metadata_fetcher = MetadataFetcher( name=config.tool["name"], version=config.tool["version"], metadata=config.tool["configuration"], ) actual_fetcher = swh_storage.metadata_fetcher_get( config.tool["name"], config.tool["version"] ) assert actual_fetcher == metadata_fetcher directory_swhid = parse_swhid(complete_deposit.swhid) page_results = swh_storage.raw_extrinsic_metadata_get( MetadataTargetType.DIRECTORY, directory_swhid, metadata_authority ) assert page_results == PagedResult( results=[ RawExtrinsicMetadata( type=MetadataTargetType.DIRECTORY, target=directory_swhid, discovery_date=request_meta1.date, authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="sword-v2-atom-codemeta", metadata=raw_metadata1.encode(), origin=complete_deposit.origin_url, ) ], next_page_token=None, ) def test_put_update_metadata_done_deposit_failure_mismatched_swhid( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit with SWHID not matching the deposit's. Response: 400 """ incorrect_swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" assert complete_deposit.swhid != incorrect_swhid update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = put_atom( authenticated_client, update_uri, data=atom_dataset["entry-data1"], HTTP_X_CHECK_SWHID=incorrect_swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Mismatched provided SWHID" in response.content def test_put_update_metadata_done_deposit_failure_malformed_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with a malformed xml Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = put_atom( authenticated_client, update_uri, data=atom_dataset["entry-data-ko"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Malformed xml metadata" in response.content def test_put_update_metadata_done_deposit_failure_empty_xml( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done with an empty xml. Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) atom_content = atom_dataset["entry-data-empty-body"] response = put_atom( authenticated_client, update_uri, data=atom_content, HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Empty body request is not supported" in response.content def test_put_update_metadata_done_deposit_failure_functional_checks( tmp_path, authenticated_client, complete_deposit, deposit_collection, atom_dataset, swh_storage, ): """failure: client updates metadata on deposit done without required incomplete metadata Response: 400 """ update_uri = reverse(EDIT_IRI, args=[deposit_collection.name, complete_deposit.id]) response = put_atom( authenticated_client, update_uri, # no title, nor author, nor name fields data=atom_dataset["entry-data-fail-metadata-functional-checks"], HTTP_X_CHECK_SWHID=complete_deposit.swhid, ) assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"Functional metadata checks failure" in response.content # detail on the errors msg = ( b"- Mandatory fields are missing (" b"atom:name or atom:title or codemeta:name, " b"atom:author or codemeta:author)" ) assert msg in response.content def test_put_atom_with_create_origin_and_external_identifier( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ was deprecated before was introduced, clients should get an error when trying to use both """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) response = post_atom( authenticated_client, url, data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when response = put_atom( authenticated_client, edit_iri, data=atom_dataset["error-with-external-identifier"] % external_id, HTTP_IN_PROGRESS="false", ) assert b"<external_identifier> is deprecated" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_put_atom_with_create_origin_and_reference( authenticated_client, deposit_collection, atom_dataset, deposit_user ): """ and are mutually exclusive """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id url = reverse(COL_IRI, args=[deposit_collection.name]) response = post_atom( authenticated_client, url, data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) for link in response_content["atom:link"]: if link["@rel"] == "edit": edit_iri = link["@href"] break else: assert False, response_content # when response = put_atom( authenticated_client, edit_iri, data=atom_dataset["entry-data-with-origin-reference"].format(url=origin_url), HTTP_IN_PROGRESS="false", ) assert b"only one may be used on a given deposit" in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_update_binary.py b/swh/deposit/tests/api/test_deposit_update_binary.py index 77cb4388..027845c2 100644 --- a/swh/deposit/tests/api/test_deposit_update_binary.py +++ b/swh/deposit/tests/api/test_deposit_update_binary.py @@ -1,408 +1,408 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on EM-IRI""" from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI, SE_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( check_archive, create_arborescence_archive, post_archive, post_atom, put_archive, put_atom, ) def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="true", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = post_atom( authenticated_client, reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = put_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive(sample_archive["name"], deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit_id]) se_iri = reverse("se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready r = put_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding file is no longer possible since the deposit's status # is ready r = post_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # replacing metadata is no longer possible since the deposit's # status is ready r = put_atom( authenticated_client, edit_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = post_atom( authenticated_client, se_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content diff --git a/swh/deposit/tests/api/test_get_file.py b/swh/deposit/tests/api/test_get_file.py index 303df343..1d9d0ee3 100644 --- a/swh/deposit/tests/api/test_get_file.py +++ b/swh/deposit/tests/api/test_get_file.py @@ -1,51 +1,51 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests 'GET File-IRI'.""" -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import CONT_FILE_IRI from swh.deposit.models import DEPOSIT_STATUS_DETAIL from swh.deposit.parsers import parse_xml def test_api_deposit_content_nominal( client, complete_deposit, partial_deposit_only_metadata ): """Retrieve information on deposit should return 200 response """ for deposit in [complete_deposit, partial_deposit_only_metadata]: expected_deposit = { "swh:deposit_id": str(deposit.id), "swh:deposit_status": deposit.status, "swh:deposit_status_detail": DEPOSIT_STATUS_DETAIL[deposit.status], } url = reverse(CONT_FILE_IRI, args=[deposit.collection.name, deposit.id]) response = client.get(url) assert response.status_code == status.HTTP_200_OK actual_deposit = dict(parse_xml(response.content)) del actual_deposit["swh:deposit_date"] assert set(actual_deposit.items()) >= set(expected_deposit.items()) def test_api_deposit_content_unknown(client, complete_deposit, deposit_collection): """Retrieve information on unknown deposit or collection should return 404 """ unknown_deposit_id = 999 unknown_collection = "unknown" for collection, deposit_id in [ (deposit_collection.name, unknown_deposit_id), (unknown_collection, complete_deposit.id), (complete_deposit.collection.name, complete_deposit.id + 10), ]: url = reverse(CONT_FILE_IRI, args=[collection, deposit_id]) response = client.get(url) assert response.status_code == status.HTTP_404_NOT_FOUND diff --git a/swh/deposit/tests/api/test_service_document.py b/swh/deposit/tests/api/test_service_document.py index d1d24fa8..d57d3a85 100644 --- a/swh/deposit/tests/api/test_service_document.py +++ b/swh/deposit/tests/api/test_service_document.py @@ -1,82 +1,82 @@ -# Copyright (C) 2017-2019 The Software Heritage developers +# Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.urls import reverse +from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit.config import SD_IRI def test_service_document_no_auth_fails(client): """Without authentication, service document endpoint should return 401 """ url = reverse(SD_IRI) response = client.get(url) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_service_document_no_auth_with_http_auth_should_not_break(client): """Without auth, sd endpoint through browser should return 401 """ url = reverse(SD_IRI) response = client.get(url, HTTP_ACCEPT="text/html,application/xml;q=9,*/*,q=8") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_service_document(authenticated_client, deposit_user): """With authentication, service document list user's collection """ url = reverse(SD_IRI) response = authenticated_client.get(url) check_response(response, deposit_user.username) def test_service_document_with_http_accept_header(authenticated_client, deposit_user): """With authentication, with browser, sd list user's collection """ url = reverse(SD_IRI) response = authenticated_client.get( url, HTTP_ACCEPT="text/html,application/xml;q=9,*/*,q=8" ) check_response(response, deposit_user.username) def check_response(response, username): assert response.status_code == status.HTTP_200_OK assert ( response.content.decode("utf-8") == """ 2.0 %s The Software Heritage (SWH) Archive %s Software Collection application/zip application/x-tar Collection Policy Software Heritage Archive Collect, Preserve, Share false false http://purl.org/net/sword/package/SimpleZip http://testserver/1/%s/ %s """ % (500, username, username, username, username) ) # noqa diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py index d01970c7..131b67f9 100644 --- a/swh/deposit/tests/conftest.py +++ b/swh/deposit/tests/conftest.py @@ -1,479 +1,479 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from functools import partial from io import BytesIO import os import re from typing import Mapping from django.test.utils import setup_databases # type: ignore -from django.urls import reverse +from django.urls import reverse_lazy as reverse import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import pytest from rest_framework import status from rest_framework.test import APIClient import yaml from swh.core.config import read from swh.core.pytest_plugin import get_response_cb from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, SE_IRI, setup_django_for, ) from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, post_archive, post_atom, ) from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT, swhid from swh.scheduler import get_scheduler # mypy is asked to ignore the import statement above because setup_databases # is not part of the d.t.utils.__all__ variable. TEST_USER = { "username": "test", "password": "password", "email": "test@example.org", "provider_url": "https://hal-test.archives-ouvertes.fr/", "domain": "archives-ouvertes.fr/", "collection": {"name": "test"}, } ANOTHER_TEST_USER = { "username": "test2", "password": "password2", "email": "test@example2.org", "provider_url": "https://hal-test.archives-ouvertes.example/", "domain": "archives-ouvertes.example/", "collection": {"name": "another-collection"}, } def pytest_configure(): setup_django_for("testing") @pytest.fixture def requests_mock_datadir(datadir, requests_mock_datadir): """Override default behavior to deal with put/post methods """ cb = partial(get_response_cb, datadir=datadir) requests_mock_datadir.put(re.compile("https://"), body=cb) requests_mock_datadir.post(re.compile("https://"), body=cb) return requests_mock_datadir @pytest.fixture() def deposit_config(swh_scheduler_config, swh_storage_backend_config): return { "max_upload_size": 500, "extraction_dir": "/tmp/swh-deposit/test/extraction-dir", "checks": False, "scheduler": {"cls": "local", **swh_scheduler_config,}, "storage_metadata": swh_storage_backend_config, } @pytest.fixture() def deposit_config_path(tmp_path, monkeypatch, deposit_config): conf_path = os.path.join(tmp_path, "deposit.yml") with open(conf_path, "w") as f: f.write(yaml.dump(deposit_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path @pytest.fixture(autouse=True) def deposit_autoconfig(deposit_config_path): """Enforce config for deposit classes inherited from APIConfig.""" cfg = read(deposit_config_path) if "scheduler" in cfg: # scheduler setup: require the check-deposit and load-deposit tasks scheduler = get_scheduler(**cfg["scheduler"]) task_types = [ { "type": "check-deposit", "backend_name": "swh.deposit.loader.tasks.ChecksDepositTsk", "description": "Check deposit metadata/archive before loading", "num_retries": 3, }, { "type": "load-deposit", "backend_name": "swh.loader.package.deposit.tasks.LoadDeposit", "description": "Loading deposit archive into swh archive", "num_retries": 3, }, ] for task_type in task_types: scheduler.create_task_type(task_type) @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", "tests"), ("USER", postgresql_proc.user), # noqa ("HOST", postgresql_proc.host), # noqa ("PORT", postgresql_proc.port), # noqa } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) def execute_sql(sql): """Execute sql to postgres db""" with psycopg2.connect(database="postgres") as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() cur.execute(sql) @pytest.fixture(autouse=True, scope="session") def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ["http_proxy"] = "http://localhost:999" os.environ["https_proxy"] = "http://localhost:999" def create_deposit_collection(collection_name: str): """Create a deposit collection with name collection_name """ from swh.deposit.models import DepositCollection try: collection = DepositCollection._default_manager.get(name=collection_name) except DepositCollection.DoesNotExist: collection = DepositCollection(name=collection_name) collection.save() return collection def deposit_collection_factory(collection_name=TEST_USER["collection"]["name"]): @pytest.fixture def _deposit_collection(db, collection_name=collection_name): return create_deposit_collection(collection_name) return _deposit_collection deposit_collection = deposit_collection_factory() deposit_another_collection = deposit_collection_factory("another-collection") def _create_deposit_user(db, collection, user_data): """Create/Return the test_user "test" """ from swh.deposit.models import DepositClient try: user = DepositClient._default_manager.get(username=user_data["username"]) except DepositClient.DoesNotExist: user = DepositClient._default_manager.create_user( username=user_data["username"], email=user_data["email"], password=user_data["password"], provider_url=user_data["provider_url"], domain=user_data["domain"], ) user.collections = [collection.id] user.save() return user @pytest.fixture def deposit_user(db, deposit_collection): return _create_deposit_user(db, deposit_collection, TEST_USER) @pytest.fixture def deposit_another_user(db, deposit_another_collection): return _create_deposit_user(db, deposit_another_collection, ANOTHER_TEST_USER) @pytest.fixture def client(): """Override pytest-django one which does not work for djangorestframework. """ return APIClient() # <- drf's client def _create_authenticated_client(client, user, user_data): """Returned a logged client This also patched the client instance to keep a reference on the associated deposit_user. """ _token = "%s:%s" % (user.username, user_data["password"]) token = base64.b64encode(_token.encode("utf-8")) authorization = "Basic %s" % token.decode("utf-8") client.credentials(HTTP_AUTHORIZATION=authorization) client.deposit_client = user yield client client.logout() @pytest.fixture def authenticated_client(client, deposit_user): yield from _create_authenticated_client(client, deposit_user, TEST_USER) @pytest.fixture def another_authenticated_client(deposit_another_user): client = APIClient() yield from _create_authenticated_client( client, deposit_another_user, ANOTHER_TEST_USER ) @pytest.fixture def sample_archive(tmp_path): """Returns a sample archive """ tmp_path = str(tmp_path) # pytest version limitation in previous version archive = create_arborescence_archive( tmp_path, "archive1", "file1", b"some content in file" ) return archive @pytest.fixture def atom_dataset(datadir) -> Mapping[str, str]: """Compute the paths to atom files. Returns: Dict of atom name per content (bytes) """ atom_path = os.path.join(datadir, "atom") data = {} for filename in os.listdir(atom_path): filepath = os.path.join(atom_path, filename) with open(filepath, "rb") as f: raw_content = f.read().decode("utf-8") # Keep the filename without extension atom_name = filename.split(".")[0] data[atom_name] = raw_content return data def create_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False, ): """Create a skeleton shell deposit """ url = reverse(COL_IRI, args=[collection_name]) # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS=str(in_progress).lower(), ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() from swh.deposit.models import Deposit response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit._default_manager.get(id=deposit_id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def create_binary_deposit( authenticated_client, collection_name: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, atom_dataset: Mapping[str, bytes] = {}, **kwargs, ): """Create a deposit with both metadata and archive set. Then alters its status to `deposit_status`. """ deposit = create_deposit( authenticated_client, collection_name, deposit_status=DEPOSIT_STATUS_PARTIAL, **kwargs, ) origin_url = deposit.client.provider_url + deposit.external_id response = post_atom( authenticated_client, reverse(SE_IRI, args=[collection_name, deposit.id]), data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED assert deposit.status == DEPOSIT_STATUS_PARTIAL from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit.id) assert deposit.status == deposit_status return deposit def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False): """Build deposit with a specific status """ @pytest.fixture() def _deposit( sample_archive, deposit_collection, authenticated_client, deposit_status=deposit_status, ): external_id = "external-id-%s" % deposit_status return create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id=external_id, deposit_status=deposit_status, in_progress=in_progress, ) return _deposit deposited_deposit = deposit_factory() rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) partial_deposit = deposit_factory( deposit_status=DEPOSIT_STATUS_PARTIAL, in_progress=True ) verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED) completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) @pytest.fixture def partial_deposit_with_metadata( sample_archive, deposit_collection, authenticated_client, atom_dataset ): """Returns deposit with archive and metadata provided, status 'partial' """ return create_binary_deposit( authenticated_client, deposit_collection.name, sample_archive=sample_archive, external_id="external-id-partial", in_progress=True, deposit_status=DEPOSIT_STATUS_PARTIAL, atom_dataset=atom_dataset, ) @pytest.fixture def partial_deposit_only_metadata( deposit_collection, authenticated_client, atom_dataset ): response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data1"], HTTP_SLUG="external-id-partial", HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content["swh:deposit_id"] from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_PARTIAL return deposit @pytest.fixture def complete_deposit(sample_archive, deposit_collection, authenticated_client): """Returns a completed deposit (load success) """ deposit = create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id="external-id-complete", deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS, ) origin = "https://hal.archives-ouvertes.fr/hal-01727745" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10" snapshot_id = "e5e82d064a9c3df7464223042e0c55d72ccff7f0" deposit.swhid = swhid(DIRECTORY, directory_id) deposit.swhid_context = swhid( DIRECTORY, directory_id, metadata={ "origin": origin, "visit": swhid(SNAPSHOT, snapshot_id), "anchor": swhid(REVISION, revision_id), "path": "/", }, ) deposit.save() return deposit @pytest.fixture() def tmp_path(tmp_path): return str(tmp_path) # issue with oldstable's pytest version