diff --git a/requirements-server.txt b/requirements-server.txt index ddcc67aa..dd72a79b 100644 --- a/requirements-server.txt +++ b/requirements-server.txt @@ -1,4 +1,4 @@ -django < 3 +django >= 2, < 4 djangorestframework psycopg2 < 2.9 setuptools diff --git a/swh/deposit/auth.py b/swh/deposit/auth.py index e12da8b1..655d05a5 100644 --- a/swh/deposit/auth.py +++ b/swh/deposit/auth.py @@ -1,186 +1,186 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Optional from django.core.cache import cache from django.utils import timezone from rest_framework import status from rest_framework.authentication import BasicAuthentication from rest_framework.exceptions import AuthenticationFailed from rest_framework.permissions import BasePermission from sentry_sdk import capture_exception from swh.auth.django.models import OIDCUser from swh.auth.django.utils import oidc_user_from_profile from swh.auth.keycloak import ( KeycloakError, KeycloakOpenIDConnect, keycloak_error_message, ) from swh.deposit.models import DepositClient from .errors import UNAUTHORIZED, make_error_response logger = logging.getLogger(__name__) OIDC_DEPOSIT_CLIENT_ID = "swh-deposit" DEPOSIT_PERMISSION = "swh.deposit.api" def convert_response(request, content): """Convert response from drf's basic authentication mechanism to a swh-deposit one. Args: request (Request): Use to build the response content (bytes): The drf's answer Returns: Response with the same status error as before, only the body is now an swh-deposit compliant one. """ from json import loads content = loads(content.decode("utf-8")) detail = content.get("detail") if detail: verbose_description = "API is protected by basic authentication" else: detail = "API is protected by basic authentication" verbose_description = None response = make_error_response( request, UNAUTHORIZED, summary=detail, verbose_description=verbose_description ) response["WWW-Authenticate"] = 'Basic realm=""' return response class WrapBasicAuthenticationResponseMiddleware: """Middleware to capture potential authentication error and convert them to standard deposit response. This is to be installed in django's settings.py module. """ def __init__(self, get_response): super().__init__() self.get_response = get_response def __call__(self, request): response = self.get_response(request) if response.status_code is status.HTTP_401_UNAUTHORIZED: - content_type = response._headers.get("content-type") - if content_type == ("Content-Type", "application/json"): + content_type = response.get("content-type") + if content_type == "application/json": return convert_response(request, response.content) return response class HasDepositPermission(BasePermission): """Allows access to authenticated users with the DEPOSIT_PERMISSION. """ def has_permission(self, request, view): assert isinstance(request.user, DepositClient) return request.user.oidc_user.has_perm(DEPOSIT_PERMISSION) class KeycloakBasicAuthentication(BasicAuthentication): """Keycloack authentication against username/password. Deposit users will continue sending `Basic authentication` queries to the deposit server. Transparently, the deposit server will stop authenticate itself the users. It will delegate the authentication queries to the keycloak instance. Technically, reuses :class:`rest_framework.BasicAuthentication` and overrides the func:`authenticate_credentials` method to discuss with keycloak. As an implementation detail, this also uses the django cache mechanism to avoid too many authentication request to keycloak. """ _client: Optional[KeycloakOpenIDConnect] = None @property def client(self): if self._client is None: self._client = KeycloakOpenIDConnect.from_configfile( client_id=OIDC_DEPOSIT_CLIENT_ID ) return self._client def _cache_key(self, user_id: str) -> str: """Internal key to use to store user id token. """ return f"oidc_user_{self.client.realm_name}_{self.client.client_id}_{user_id}" def get_user(self, user_id: str) -> Optional[OIDCUser]: """Retrieve user from cache if any. """ oidc_profile = cache.get(self._cache_key(user_id)) if oidc_profile: try: return oidc_user_from_profile(self.client, oidc_profile) except Exception as e: logger.warning("Error during cache token retrieval: %s", e) capture_exception(e) return None def authenticate_credentials(self, user_id, password, request): """Authenticate the user_id/password against keycloak. Raises: AuthenticationFailed in case of authentication failure Returns: Tuple of deposit_client, None. """ oidc_user = self.get_user(user_id) ttl: Optional[int] = None if not oidc_user: try: oidc_profile = self.client.login(user_id, password) except KeycloakError as e: logger.debug("KeycloakError: e: %s", e) error_msg = keycloak_error_message(e) raise AuthenticationFailed(error_msg) oidc_user = oidc_user_from_profile(self.client, oidc_profile) ttl = int( oidc_user.refresh_expires_at.timestamp() - timezone.now().timestamp() ) # Making sure the associated deposit client is correctly configured in backend try: deposit_client = DepositClient.objects.get(username=user_id) except DepositClient.DoesNotExist: raise AuthenticationFailed(f"Unknown user {user_id}") if not deposit_client.is_active: raise AuthenticationFailed(f"Deactivated user {user_id}") deposit_client.oidc_user = oidc_user if ttl: # cache the oidc_profile user while it's valid cache.set( self._cache_key(user_id), oidc_profile, timeout=max(0, ttl), ) return (deposit_client, None) diff --git a/swh/deposit/tests/api/test_collection_reuse_slug.py b/swh/deposit/tests/api/test_collection_reuse_slug.py index 99670b57..8f4bfbd8 100644 --- a/swh/deposit/tests/api/test_collection_reuse_slug.py +++ b/swh/deposit/tests/api/test_collection_reuse_slug.py @@ -1,283 +1,284 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO from django.urls import reverse_lazy as reverse from rest_framework import status +import xmltodict from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, SE_IRI, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import post_atom from ..conftest import internal_create_deposit def test_act_on_deposit_rejected_is_not_permitted( authenticated_client, deposit_collection, rejected_deposit, atom_dataset ): deposit = rejected_deposit response = post_atom( authenticated_client, reverse(SE_IRI, args=[deposit.collection.name, deposit.id]), data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_400_BAD_REQUEST - msg = "You can only act on deposit with status '%s'" % ( - DEPOSIT_STATUS_PARTIAL, + assert ( + xmltodict.parse(response.content)["sword:error"]["summary"] + == f"You can only act on deposit with status '{DEPOSIT_STATUS_PARTIAL}'" ) - assert msg in response.content.decode("utf-8") def test_add_deposit_when_partial_makes_new_deposit( authenticated_client, deposit_collection, partial_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when previous is partial makes new deposit """ deposit = partial_deposit assert deposit.status == DEPOSIT_STATUS_PARTIAL origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED, response.content.decode() response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id # new deposit new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_when_failed_makes_new_deposit_with_no_parent( authenticated_client, deposit_collection, failed_deposit, atom_dataset, deposit_user ): """Posting deposit on collection when deposit done makes new deposit with parent """ deposit = failed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit != deposit assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_when_done_makes_new_deposit_with_parent_old_one( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Posting deposit on collection when deposit done makes new deposit with parent """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url def test_add_deposit_with_external_identifier( authenticated_client, deposit_collection, completed_deposit, atom_dataset, deposit_user, ): """Even though is deprecated, it should still be allowed when it matches the slug, so that we don't break existing clients """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # adding a new deposit with the same external id as a completed deposit # creates the parenting chain response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["error-with-external-identifier"] % deposit.external_id, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.origin_url == origin_url assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url def test_add_deposit_external_id_conflict_no_parent( authenticated_client, deposit_collection, deposit_another_collection, atom_dataset, deposit_user, deposit_another_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client does not create a parent relationship """ external_id = "foobar" origin_url = deposit_user.provider_url + external_id # create a deposit for that other user, with the same slug other_deposit = internal_create_deposit( deposit_another_user, deposit_another_collection, external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert other_deposit.id != deposit_id new_deposit = Deposit.objects.get(pk=deposit_id) assert new_deposit.parent is None assert new_deposit.origin_url == origin_url def test_add_deposit_external_id_conflict_with_parent( authenticated_client, deposit_collection, deposit_another_collection, completed_deposit, atom_dataset, deposit_user, deposit_another_user, ): """Posting a deposit with an external_id conflicting with an external_id of a different client creates a parent relationship with the deposit of the right client instead of the last matching deposit This test does not have an equivalent for origin url conflicts, as these can not happen (assuming clients do not have provider_url overlaps) """ # given multiple deposit already loaded deposit = completed_deposit assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS origin_url = deposit_user.provider_url + deposit.external_id # create a deposit for that other user, with the same slug other_deposit = internal_create_deposit( deposit_another_user, deposit_another_collection, deposit.external_id, DEPOSIT_STATUS_LOAD_SUCCESS, ) # adding a new deposit with the same external id as a completed deposit response = post_atom( authenticated_client, reverse(COL_IRI, args=[deposit_collection.name]), data=atom_dataset["entry-data0"] % origin_url, HTTP_SLUG=deposit.external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] assert deposit_id != deposit.id assert other_deposit.id != deposit.id new_deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == new_deposit.collection assert deposit.external_id == new_deposit.external_id assert new_deposit != deposit assert new_deposit.parent == deposit assert new_deposit.origin_url == origin_url diff --git a/swh/deposit/tests/api/test_delete.py b/swh/deposit/tests/api/test_delete.py index 80479df1..8121f01f 100644 --- a/swh/deposit/tests/api/test_delete.py +++ b/swh/deposit/tests/api/test_delete.py @@ -1,131 +1,134 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from typing import Dict, Mapping from django.urls import reverse_lazy as reverse from rest_framework import status +import xmltodict from swh.deposit.config import ( ARCHIVE_KEY, DEPOSIT_STATUS_DEPOSITED, EDIT_IRI, EM_IRI, METADATA_KEY, ) from swh.deposit.models import Deposit, DepositRequest def count_deposit_request_types(deposit_requests) -> Mapping[str, int]: deposit_request_types = defaultdict(int) # type: Dict[str, int] for dr in deposit_requests: deposit_request_types[dr.type] += 1 return deposit_request_types def test_delete_archive_on_partial_deposit_works( authenticated_client, partial_deposit_with_metadata, deposit_collection ): """Removing partial deposit's archive should return a 204 response """ deposit_id = partial_deposit_with_metadata.id deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) # deposit request type: 'archive', 1 'metadata' deposit_request_types = count_deposit_request_types(deposit_requests) assert deposit_request_types == {ARCHIVE_KEY: 1, METADATA_KEY: 1} # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit_id) deposit_requests2 = DepositRequest.objects.filter(deposit=deposit) deposit_request_types = count_deposit_request_types(deposit_requests2) assert deposit_request_types == {METADATA_KEY: 1} def test_delete_archive_on_undefined_deposit_fails( authenticated_client, deposit_collection, sample_archive ): """Delete undefined deposit returns a 404 response """ # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, 999]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_404_NOT_FOUND def test_delete_non_partial_deposit( authenticated_client, deposit_collection, deposited_deposit ): """Delete !partial status deposit should return a 400 response """ deposit = deposited_deposit assert deposit.status == DEPOSIT_STATUS_DEPOSITED # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( - b"You can only act on deposit with status 'partial'" in response.content + xmltodict.parse(response.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None def test_delete_partial_deposit( authenticated_client, deposit_collection, partial_deposit ): """Delete deposit should return a 204 response """ # given deposit = partial_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) assert deposit_requests == [] deposits = list(Deposit.objects.filter(pk=deposit.id)) assert deposits == [] def test_delete_on_edit_iri_cannot_delete_non_partial_deposit( authenticated_client, deposit_collection, complete_deposit ): """Delete !partial deposit should return a 400 response """ # given deposit = complete_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( - b"You can only act on deposit with status 'partial'" in response.content + xmltodict.parse(response.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None diff --git a/swh/deposit/tests/api/test_deposit_update_binary.py b/swh/deposit/tests/api/test_deposit_update_binary.py index 027845c2..ed9bc585 100644 --- a/swh/deposit/tests/api/test_deposit_update_binary.py +++ b/swh/deposit/tests/api/test_deposit_update_binary.py @@ -1,408 +1,427 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on EM-IRI""" from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse_lazy as reverse from rest_framework import status +import xmltodict from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI, SE_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( check_archive, create_arborescence_archive, post_archive, post_atom, put_archive, put_atom, ) def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="true", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = post_atom( authenticated_client, reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = put_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive(sample_archive["name"], deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit_id]) se_iri = reverse("se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready r = put_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) # adding file is no longer possible since the deposit's status # is ready r = post_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) # replacing metadata is no longer possible since the deposit's # status is ready r = put_atom( authenticated_client, edit_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) # adding new metadata is no longer possible since the # deposit's status is ready r = post_atom( authenticated_client, se_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST - assert b"You can only act on deposit with status 'partial'" in r.content + assert ( + xmltodict.parse(r.content)["sword:error"]["summary"] + == "You can only act on deposit with status 'partial'" + ) diff --git a/tox.ini b/tox.ini index c27673b6..105f1be9 100644 --- a/tox.ini +++ b/tox.ini @@ -1,81 +1,82 @@ [tox] -envlist=flake8,mypy,py3-django2 +envlist=flake8,mypy,py3-django2,py3-django3 [testenv] extras = testing deps = # the dependency below is needed for now as a workaround for # https://github.com/pypa/pip/issues/6239 swh.core[http] >= 0.3 swh.scheduler[testing] >= 0.5.0 dev: pdbpp pytest-cov django2: Django>=2,<3 + django3: Django>=3,<4 commands = pytest \ !dev: --cov {envsitepackagesdir}/swh/deposit --cov-branch \ {envsitepackagesdir}/swh/deposit \ {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 \ --exclude=.tox,.git,__pycache__,.tox,.eggs,*.egg,swh/deposit/migrations [testenv:mypy] setenv = DJANGO_SETTINGS_MODULE=swh.deposit.settings.testing extras = testing deps = mypy commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = - make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs \ No newline at end of file + make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs