diff --git a/requirements.txt b/requirements.txt index 15851850..ac7df224 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ click -xmltodict iso8601 requests diff --git a/swh/deposit/cli/client.py b/swh/deposit/cli/client.py index d7adbd54..295c970f 100644 --- a/swh/deposit/cli/client.py +++ b/swh/deposit/cli/client.py @@ -1,656 +1,648 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from contextlib import contextmanager from datetime import datetime, timezone import logging # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import sys from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional import warnings +import xml.etree.ElementTree as ET import click from swh.deposit.cli import deposit +from swh.deposit.utils import NAMESPACES as NS logger = logging.getLogger(__name__) if TYPE_CHECKING: from swh.deposit.client import PublicApiDepositClient class InputError(ValueError): """Input script error """ pass @contextmanager def trap_and_report_exceptions(): """Trap and report exceptions (InputError, MaintenanceError) in a unified way. """ from swh.deposit.client import MaintenanceError try: yield except InputError as e: logger.error("Problem during parsing options: %s", e) sys.exit(1) except MaintenanceError as e: logger.error(e) sys.exit(1) def _url(url: str) -> str: """Force the /1 api version at the end of the url (avoiding confusing issues without it). Args: url (str): api url used by cli users Returns: Top level api url to actually request """ if not url.endswith("/1"): url = "%s/1" % url return url def generate_metadata( deposit_client: str, name: str, authors: List[str], external_id: Optional[str] = None, create_origin: Optional[str] = None, metadata_provenance_url: Optional[str] = None, ) -> str: """Generate sword compliant xml metadata with the minimum required metadata. The Atom spec, https://tools.ietf.org/html/rfc4287, says that: - atom:entry elements MUST contain one or more atom:author elements - atom:entry elements MUST contain exactly one atom:title element. - atom:entry elements MUST contain exactly one atom:updated element. However, we are also using CodeMeta, so we want some basic information to be mandatory. Therefore, we generate the following mandatory fields: - http://www.w3.org/2005/Atom#updated - http://www.w3.org/2005/Atom#author - http://www.w3.org/2005/Atom#title - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#name (yes, in addition to http://www.w3.org/2005/Atom#title, even if they have somewhat the same meaning) - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#author Args: deposit_client: Deposit client username, name: Software name authors: List of author names create_origin: Origin concerned by the deposit metadata_provenance_url: Provenance metadata url Returns: metadata xml string """ - import xmltodict - # generate a metadata file with the minimum required metadata - document = { - "atom:entry": { - "@xmlns:atom": "http://www.w3.org/2005/Atom", - "@xmlns:codemeta": "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0", - "@xmlns:schema": "http://schema.org/", - "atom:updated": datetime.now(tz=timezone.utc), # mandatory, cf. docstring - "atom:author": deposit_client, # mandatory, cf. docstring - "atom:title": name, # mandatory, cf. docstring - "codemeta:name": name, # mandatory, cf. docstring - "codemeta:author": [ # mandatory, cf. docstring - {"codemeta:name": author_name} for author_name in authors - ], - }, - } + document = ET.Element(f"{{{NS['atom']}}}entry") + now = datetime.now(tz=timezone.utc) + ET.SubElement(document, f"{{{NS['atom']}}}updated").text = str(now) + ET.SubElement(document, f"{{{NS['atom']}}}author").text = deposit_client + ET.SubElement(document, f"{{{NS['atom']}}}title").text = name + ET.SubElement(document, f"{{{NS['codemeta']}}}name").text = name + for author_name in authors: + author = ET.SubElement(document, f"{{{NS['codemeta']}}}author") + ET.SubElement(author, f"{{{NS['codemeta']}}}name").text = author_name + if external_id: - document["atom:entry"]["codemeta:identifier"] = external_id + ET.SubElement(document, f"{{{NS['codemeta']}}}identifier").text = external_id - swh_deposit_dict: Dict = {} - if create_origin or metadata_provenance_url: - document["atom:entry"][ - "@xmlns:swh" - ] = "https://www.softwareheritage.org/schema/2018/deposit" + swh_deposit_elt = ET.Element(f"{{{NS['swh']}}}deposit") if create_origin: - swh_deposit_dict.update( - {"swh:create_origin": {"swh:origin": {"@url": create_origin}}} - ) + elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}create_origin") + ET.SubElement(elt, f"{{{NS['swh']}}}origin").set("url", create_origin) if metadata_provenance_url: - swh_deposit_dict.update( - {"swh:metadata-provenance": {"schema:url": metadata_provenance_url}} - ) + elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}metadata-provenance") + ET.SubElement(elt, f"{{{NS['schema']}}}url").text = metadata_provenance_url + + if len(swh_deposit_elt): + document.append(swh_deposit_elt) - if swh_deposit_dict: - document["atom:entry"]["swh:deposit"] = swh_deposit_dict + s = ET.tostring(document, encoding="utf-8").decode() - logging.debug("Atom entry dict to generate as xml: %s", document) - return xmltodict.unparse(document, pretty=True) + logging.debug("Atom entry dict to generate as xml: %s", s) + return s def _collection(client: PublicApiDepositClient) -> str: """Retrieve the client's collection """ # retrieve user's collection sd_content = client.service_document() if "error" in sd_content: msg = sd_content["error"] raise InputError(f"Service document retrieval: {msg}") collection = sd_content["app:service"]["app:workspace"][0]["app:collection"][ "sword:name" ] return collection def client_command_parse_input( client, username: str, archive: Optional[str], metadata: Optional[str], collection: Optional[str], slug: Optional[str], create_origin: Optional[str], metadata_provenance_url: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, name: Optional[str], authors: List[str], temp_dir: str, ) -> Dict[str, Any]: """Parse the client subcommand options and make sure the combination is acceptable*. If not, an InputError exception is raised explaining the issue. By acceptable, we mean: - A multipart deposit (create or update) requires: - an existing software archive - an existing metadata file or author(s) and name provided in params - A binary deposit (create/update) requires an existing software archive - A metadata deposit (create/update) requires an existing metadata file or author(s) and name provided in params - A deposit update requires a deposit_id This will not prevent all failure cases though. The remaining errors are already dealt with by the underlying api client. Raises: InputError explaining the user input related issue MaintenanceError explaining the api status Returns: dict with the following keys: "archive": the software archive to deposit "username": username "metadata": the metadata file to deposit "collection": the user's collection under which to put the deposit "create_origin": the origin concerned by the deposit "metadata_provenance_url": the metadata provenance url "in_progress": if the deposit is partial or not "url": deposit's server main entry point "deposit_id": optional deposit identifier "swhid": optional deposit swhid "replace": whether the given deposit is to be replaced or not """ if not metadata: if name and authors: metadata_path = os.path.join(temp_dir, "metadata.xml") logging.debug("Temporary file: %s", metadata_path) metadata_xml = generate_metadata( username, name, authors, external_id=slug, create_origin=create_origin, metadata_provenance_url=metadata_provenance_url, ) logging.debug("Metadata xml generated: %s", metadata_xml) with open(metadata_path, "w") as f: f.write(metadata_xml) metadata = metadata_path elif archive is not None and not partial and not deposit_id: # If we meet all the following conditions: # * this is not an archive-only deposit request # * it is not part of a multipart deposit (either create/update # or finish) # * it misses either name or authors raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ) elif name or authors: # If we are generating metadata, then all mandatory metadata # must be present raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ) else: # TODO: this is a multipart deposit, we might want to check that # metadata are deposited at some point pass elif name or authors or create_origin: raise InputError( "Using --metadata flag is incompatible with " "--author and --name and --create-origin (those are used to generate one " "metadata file)." ) if not archive and not metadata: raise InputError( "Please provide an actionable command. See --help for more information" ) if metadata: from xml.etree import ElementTree from swh.deposit.utils import ( parse_swh_deposit_origin, parse_swh_metadata_provenance, ) metadata_tree = ElementTree.fromstring(open(metadata).read()) (create_origin, add_to_origin) = parse_swh_deposit_origin(metadata_tree) if create_origin and add_to_origin: logger.error( "The metadata file provided must not contain both " '"" and "" tags', ) elif not create_origin and not add_to_origin: logger.warning( "The metadata file provided should contain " '"" or "" tag', ) meta_prov_url = parse_swh_metadata_provenance(metadata_tree) if not meta_prov_url: logger.warning( "The metadata file provided should contain " '"" tag' ) if replace and not deposit_id: raise InputError("To update an existing deposit, you must provide its id") if not collection: collection = _collection(client) return { "archive": archive, "username": username, "metadata": metadata, "collection": collection, "slug": slug, "in_progress": partial, "url": url, "deposit_id": deposit_id, "swhid": swhid, "replace": replace, } def _subdict(d: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]: "return a dict from d with only given keys" return {k: v for k, v in d.items() if k in keys} def credentials_decorator(f): """Add default --url, --username and --password flag to cli. """ f = click.option( "--password", required=True, help="(Mandatory) User's associated password" )(f) f = click.option("--username", required=True, help="(Mandatory) User's name")(f) f = click.option( "--url", default="https://deposit.softwareheritage.org", help=( "(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1" ), )(f) return f def output_format_decorator(f): """Add --format output flag decorator to cli. """ return click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", )(f) @deposit.command() @credentials_decorator @click.option( "--archive", type=click.Path(exists=True), help="(Optional) Software archive to deposit", ) @click.option( "--metadata", type=click.Path(exists=True), help=( "(Optional) Path to xml metadata file. If not provided, " "this will use a file named .metadata.xml" ), ) @click.option( "--archive-deposit/--no-archive-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--metadata-deposit/--no-metadata-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--collection", help="(Optional) User's collection. If not provided, this will be fetched.", ) @click.option( "--slug", help=( "(Deprecated) (Optional) External system information identifier. " "If not provided, it will be generated" ), ) @click.option( "--create-origin", help=( "(Optional) Origin url to attach information to. To be used alongside " "--name and --author. This will be generated alongside the metadata to " "provide to the deposit server." ), ) @click.option( "--metadata-provenance-url", help=( "(Optional) Provenance metadata url to indicate from where the metadata is " "coming from." ), ) @click.option( "--partial/--no-partial", default=False, help=( "(Optional) The deposit will be partial, other deposits " "will have to take place to finalize it." ), ) @click.option( "--deposit-id", default=None, help="(Optional) Update an existing partial deposit with its identifier", ) @click.option( "--swhid", default=None, help="(Optional) Update existing completed deposit (status done) with new metadata", ) @click.option( "--replace/--no-replace", default=False, help="(Optional) Update by replacing existing metadata to a deposit", ) @click.option("--verbose/--no-verbose", default=False, help="Verbose mode") @click.option("--name", help="Software name") @click.option( "--author", multiple=True, help="Software author(s), this can be repeated as many times" " as there are authors", ) @output_format_decorator @click.pass_context def upload( ctx, username: str, password: str, archive: Optional[str], metadata: Optional[str], archive_deposit: bool, metadata_deposit: bool, collection: Optional[str], slug: Optional[str], create_origin: Optional[str], metadata_provenance_url: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, verbose: bool, name: Optional[str], author: List[str], output_format: Optional[str], ): """Software Heritage Public Deposit Client Create/Update deposit through the command line. More documentation can be found at https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html. """ import tempfile from swh.deposit.client import PublicApiDepositClient if archive_deposit or metadata_deposit: warnings.warn( '"archive_deposit" and "metadata_deposit" option arguments are ' "deprecated and have no effect; simply do not provide the archive " "for a metadata-only deposit, and do not provide a metadata for a" "archive-only deposit.", DeprecationWarning, ) if slug: if create_origin and slug != create_origin: raise InputError( '"--slug" flag has been deprecated in favor of "--create-origin" flag. ' "You mentioned both with different values, please only " 'use "--create-origin".' ) warnings.warn( '"--slug" flag has been deprecated in favor of "--create-origin" flag. ' 'Please, start using "--create-origin" instead of "--slug"', DeprecationWarning, ) url = _url(url) client = PublicApiDepositClient(url=url, auth=(username, password)) with tempfile.TemporaryDirectory() as temp_dir: with trap_and_report_exceptions(): logger.debug("Parsing cli options") config = client_command_parse_input( client, username, archive, metadata, collection, slug, create_origin, metadata_provenance_url, partial, deposit_id, swhid, replace, url, name, author, temp_dir, ) if verbose: logger.info("Parsed configuration: %s", config) keys = [ "archive", "collection", "in_progress", "metadata", "slug", ] if config["deposit_id"]: keys += ["deposit_id", "replace", "swhid"] data = client.deposit_update(**_subdict(config, keys)) else: data = client.deposit_create(**_subdict(config, keys)) print_result(data, output_format) @deposit.command() @credentials_decorator @click.option("--deposit-id", default=None, required=True, help="Deposit identifier.") @output_format_decorator @click.pass_context def status(ctx, url, username, password, deposit_id, output_format): """Deposit's status """ from swh.deposit.client import PublicApiDepositClient url = _url(url) logger.debug("Status deposit") with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) print_result( client.deposit_status(collection=collection, deposit_id=deposit_id), output_format, ) def print_result(data: Dict[str, Any], output_format: Optional[str]) -> None: """Display the result data into a dedicated output format. """ import json import yaml if output_format == "json": click.echo(json.dumps(data)) elif output_format == "yaml": click.echo(yaml.dump(data)) else: logger.info(data) @deposit.command("metadata-only") @credentials_decorator @click.option( "--metadata", "metadata_path", type=click.Path(exists=True), required=True, help="Path to xml metadata file", ) @output_format_decorator @click.pass_context def metadata_only(ctx, url, username, password, metadata_path, output_format): """Deposit metadata only upload """ from xml.etree import ElementTree from swh.deposit.client import PublicApiDepositClient from swh.deposit.utils import parse_swh_metadata_provenance, parse_swh_reference # Parse to check for a swhid presence within the metadata file with open(metadata_path, "r") as f: metadata_raw = f.read() metadata_tree = ElementTree.fromstring(metadata_raw) actual_swhid = parse_swh_reference(metadata_tree) if not actual_swhid: raise InputError("A SWHID must be provided for a metadata-only deposit") meta_prov_url = parse_swh_metadata_provenance(metadata_tree) if not meta_prov_url: logger.warning( "A '' should be provided for a metadata-only " "deposit" ) with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) result = client.deposit_metadata_only(collection, metadata_path) print_result(result, output_format) @deposit.command("list") @credentials_decorator @output_format_decorator @click.option( "--page", default=1, help="Page number when requesting more information", ) @click.option( "--page-size", default=100, help="Page number when requesting more information", ) @click.pass_context def deposit_list(ctx, url, username, password, output_format, page, page_size): """Client deposit listing """ from swh.deposit.client import PublicApiDepositClient url = _url(url) logger.debug("List deposits for user %s", username) with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) result = client.deposit_list(collection, page=page, page_size=page_size) print_result(result, output_format) diff --git a/swh/deposit/tests/api/test_delete.py b/swh/deposit/tests/api/test_delete.py index 5eeb758a..af274d87 100644 --- a/swh/deposit/tests/api/test_delete.py +++ b/swh/deposit/tests/api/test_delete.py @@ -1,134 +1,135 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from typing import Dict, Mapping +import xml.etree.ElementTree as ET from django.urls import reverse_lazy as reverse from rest_framework import status -import xmltodict from swh.deposit.config import ( ARCHIVE_TYPE, DEPOSIT_STATUS_DEPOSITED, EDIT_IRI, EM_IRI, METADATA_TYPE, ) from swh.deposit.models import Deposit, DepositRequest +from swh.deposit.utils import NAMESPACES def count_deposit_request_types(deposit_requests) -> Mapping[str, int]: deposit_request_types = defaultdict(int) # type: Dict[str, int] for dr in deposit_requests: deposit_request_types[dr.type] += 1 return deposit_request_types def test_delete_archive_on_partial_deposit_works( authenticated_client, partial_deposit_with_metadata, deposit_collection ): """Removing partial deposit's archive should return a 204 response """ deposit_id = partial_deposit_with_metadata.id deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) # deposit request type: 'archive', 1 'metadata' deposit_request_types = count_deposit_request_types(deposit_requests) assert deposit_request_types == {ARCHIVE_TYPE: 1, METADATA_TYPE: 1} # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit = Deposit.objects.get(pk=deposit_id) deposit_requests2 = DepositRequest.objects.filter(deposit=deposit) deposit_request_types = count_deposit_request_types(deposit_requests2) assert deposit_request_types == {METADATA_TYPE: 1} def test_delete_archive_on_undefined_deposit_fails( authenticated_client, deposit_collection, sample_archive ): """Delete undefined deposit returns a 404 response """ # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, 999]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_404_NOT_FOUND def test_delete_non_partial_deposit( authenticated_client, deposit_collection, deposited_deposit ): """Delete !partial status deposit should return a 400 response """ deposit = deposited_deposit assert deposit.status == DEPOSIT_STATUS_DEPOSITED # when update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(update_uri) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(response.content)["sword:error"]["summary"] + ET.fromstring(response.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None def test_delete_partial_deposit( authenticated_client, deposit_collection, partial_deposit ): """Delete deposit should return a 204 response """ # given deposit = partial_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_204_NO_CONTENT deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) assert deposit_requests == [] deposits = list(Deposit.objects.filter(pk=deposit.id)) assert deposits == [] def test_delete_on_edit_iri_cannot_delete_non_partial_deposit( authenticated_client, deposit_collection, complete_deposit ): """Delete !partial deposit should return a 400 response """ # given deposit = complete_deposit # when url = reverse(EDIT_IRI, args=[deposit_collection.name, deposit.id]) response = authenticated_client.delete(url) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(response.content)["sword:error"]["summary"] + ET.fromstring(response.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) deposit = Deposit.objects.get(pk=deposit.id) assert deposit is not None diff --git a/swh/deposit/tests/api/test_deposit_update_binary.py b/swh/deposit/tests/api/test_deposit_update_binary.py index 1a6febbe..22e97240 100644 --- a/swh/deposit/tests/api/test_deposit_update_binary.py +++ b/swh/deposit/tests/api/test_deposit_update_binary.py @@ -1,428 +1,428 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on EM-IRI""" from io import BytesIO +import xml.etree.ElementTree as ET from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse_lazy as reverse from rest_framework import status -import xmltodict from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI, SE_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( check_archive, create_arborescence_archive, post_archive, post_atom, put_archive, put_atom, ) from swh.deposit.utils import NAMESPACES def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="true", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content.findtext("swh:deposit_id", namespaces=NAMESPACES) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = post_atom( authenticated_client, reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = put_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( response_content.findtext("atom:summary", namespaces=NAMESPACES) == "Deposit %s does not exist" % unknown_deposit_id ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( response_content.findtext("atom:summary", namespaces=NAMESPACES) == "Deposit %s does not exist" % unknown_deposit_id ) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) response = post_archive( authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = post_archive( authenticated_client, url, sample_archive, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content.findtext("swh:deposit_id", namespaces=NAMESPACES) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive(sample_archive["name"], deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit_id]) se_iri = reverse("se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready r = put_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) # adding file is no longer possible since the deposit's status # is ready r = post_archive( authenticated_client, em_iri, archive2, HTTP_SLUG=external_id, HTTP_IN_PROGRESS="false", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) # replacing metadata is no longer possible since the deposit's # status is ready r = put_atom( authenticated_client, edit_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) # adding new metadata is no longer possible since the # deposit's status is ready r = post_atom( authenticated_client, se_iri, data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" ) # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert ( - xmltodict.parse(r.content)["sword:error"]["summary"] + ET.fromstring(r.content).findtext("atom:summary", namespaces=NAMESPACES) == "You can only act on deposit with status 'partial'" )