diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index bf587d38..5083faa9 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,220 +1,218 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from contextlib import contextmanager import os import shutil import tempfile from typing import Any, Dict, Optional, Tuple from xml.etree import ElementTree from rest_framework import status from swh.core import tarball from swh.deposit.utils import NAMESPACES, normalize_date from swh.model.hashutil import hash_to_hex from swh.model.model import MetadataAuthorityType from swh.model.swhids import CoreSWHID from . import APIPrivateView, DepositReadMixin from ...config import ARCHIVE_TYPE, SWH_PERSON from ...models import Deposit from ..common import APIGet @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ # rebuild one zip archive from (possibly) multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix="swh.deposit-", dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, "aggregate") os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = shutil.make_archive( aggregated_tarball_rootdir, "tar", aggregated_tarball_rootdir ) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) class APIReadArchives(APIPrivateView, APIGet, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def __init__(self): super().__init__() self.extraction_dir = self.config["extraction_dir"] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def process_get( self, request, collection_name: str, deposit: Deposit ) -> Tuple[int, Any, str]: """Build a unique tarball from the multiple received and stream that content to the client. Args: request (Request): collection_name: Collection owning the deposit deposit: Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = [ r.archive.path for r in self._deposit_requests(deposit, request_type=ARCHIVE_TYPE) ] return ( status.HTTP_200_OK, aggregate_tarballs(self.extraction_dir, archive_paths), "swh/generator", ) class APIReadMetadata(APIPrivateView, APIGet, DepositReadMixin): """Class in charge of aggregating metadata on a deposit. """ def _parse_dates( self, deposit: Deposit, metadata: ElementTree.Element ) -> Tuple[dict, dict]: """Normalize the date to use as a tuple of author date, committer date from the incoming metadata. Returns: Tuple of author date, committer date. Those dates are swh normalized. """ commit_date_elt = metadata.find("codemeta:datePublished", namespaces=NAMESPACES) author_date_elt = metadata.find("codemeta:dateCreated", namespaces=NAMESPACES) author_date: Any commit_date: Any if author_date_elt is None and commit_date_elt is None: author_date = commit_date = deposit.complete_date elif commit_date_elt is None: author_date = commit_date = author_date_elt.text # type: ignore elif author_date_elt is None: author_date = commit_date = commit_date_elt.text else: author_date = author_date_elt.text commit_date = commit_date_elt.text return (normalize_date(author_date), normalize_date(commit_date)) def metadata_read(self, deposit: Deposit) -> Dict[str, Any]: """Read and aggregate multiple deposit information into one unified dictionary. Args: deposit: Deposit to retrieve information from Returns: Dictionary of deposit information read by the deposit loader, with the following keys: **origin** (Dict): Information about the origin - **metadata_raw** (str): List of raw metadata received for the + **raw_metadata** (str): List of raw metadata received for the deposit - **metadata_dict** (Dict): Deposit aggregated metadata into one dict - **provider** (Dict): the metadata provider information about the deposit client **tool** (Dict): the deposit information **deposit** (Dict): deposit information relevant to build the revision (author_date, committer_date, etc...) """ raw_metadata = self._metadata_get(deposit) author_date: Optional[dict] commit_date: Optional[dict] if raw_metadata: metadata_tree = ElementTree.fromstring(raw_metadata) author_date, commit_date = self._parse_dates(deposit, metadata_tree) else: author_date = commit_date = None if deposit.parent and deposit.parent.swhid: parent_swhid = deposit.parent.swhid assert parent_swhid is not None swhid = CoreSWHID.from_string(parent_swhid) parent_revision = hash_to_hex(swhid.object_id) parents = [parent_revision] else: parents = [] release_notes_elements = metadata_tree.findall( "codemeta:releaseNotes", namespaces=NAMESPACES ) release_notes: Optional[str] if release_notes_elements: release_notes = "\n\n".join( element.text for element in release_notes_elements if element.text ) else: release_notes = None return { "origin": {"type": "deposit", "url": deposit.origin_url}, "provider": { "provider_name": deposit.client.last_name, "provider_url": deposit.client.provider_url, "provider_type": MetadataAuthorityType.DEPOSIT_CLIENT.value, "metadata": {}, }, "tool": self.tool, - "metadata_raw": raw_metadata, + "raw_metadata": raw_metadata, "deposit": { "id": deposit.id, "client": deposit.client.username, "collection": deposit.collection.name, "author": SWH_PERSON, "author_date": author_date, "committer": SWH_PERSON, "committer_date": commit_date, "revision_parents": parents, "release_notes": release_notes, }, } def process_get( self, request, collection_name: str, deposit: Deposit ) -> Tuple[int, Dict, str]: data = self.metadata_read(deposit) return status.HTTP_200_OK, data if data else {}, "application/json" diff --git a/swh/deposit/cli/client.py b/swh/deposit/cli/client.py index 295c970f..7df3e7b0 100644 --- a/swh/deposit/cli/client.py +++ b/swh/deposit/cli/client.py @@ -1,648 +1,648 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from contextlib import contextmanager from datetime import datetime, timezone import logging # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import sys from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional import warnings import xml.etree.ElementTree as ET import click from swh.deposit.cli import deposit from swh.deposit.utils import NAMESPACES as NS logger = logging.getLogger(__name__) if TYPE_CHECKING: from swh.deposit.client import PublicApiDepositClient class InputError(ValueError): """Input script error """ pass @contextmanager def trap_and_report_exceptions(): """Trap and report exceptions (InputError, MaintenanceError) in a unified way. """ from swh.deposit.client import MaintenanceError try: yield except InputError as e: logger.error("Problem during parsing options: %s", e) sys.exit(1) except MaintenanceError as e: logger.error(e) sys.exit(1) def _url(url: str) -> str: """Force the /1 api version at the end of the url (avoiding confusing issues without it). Args: url (str): api url used by cli users Returns: Top level api url to actually request """ if not url.endswith("/1"): url = "%s/1" % url return url def generate_metadata( deposit_client: str, name: str, authors: List[str], external_id: Optional[str] = None, create_origin: Optional[str] = None, metadata_provenance_url: Optional[str] = None, ) -> str: """Generate sword compliant xml metadata with the minimum required metadata. The Atom spec, https://tools.ietf.org/html/rfc4287, says that: - atom:entry elements MUST contain one or more atom:author elements - atom:entry elements MUST contain exactly one atom:title element. - atom:entry elements MUST contain exactly one atom:updated element. However, we are also using CodeMeta, so we want some basic information to be mandatory. Therefore, we generate the following mandatory fields: - http://www.w3.org/2005/Atom#updated - http://www.w3.org/2005/Atom#author - http://www.w3.org/2005/Atom#title - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#name (yes, in addition to http://www.w3.org/2005/Atom#title, even if they have somewhat the same meaning) - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#author Args: deposit_client: Deposit client username, name: Software name authors: List of author names create_origin: Origin concerned by the deposit metadata_provenance_url: Provenance metadata url Returns: metadata xml string """ # generate a metadata file with the minimum required metadata document = ET.Element(f"{{{NS['atom']}}}entry") now = datetime.now(tz=timezone.utc) ET.SubElement(document, f"{{{NS['atom']}}}updated").text = str(now) ET.SubElement(document, f"{{{NS['atom']}}}author").text = deposit_client ET.SubElement(document, f"{{{NS['atom']}}}title").text = name ET.SubElement(document, f"{{{NS['codemeta']}}}name").text = name for author_name in authors: author = ET.SubElement(document, f"{{{NS['codemeta']}}}author") ET.SubElement(author, f"{{{NS['codemeta']}}}name").text = author_name if external_id: ET.SubElement(document, f"{{{NS['codemeta']}}}identifier").text = external_id swh_deposit_elt = ET.Element(f"{{{NS['swh']}}}deposit") if create_origin: elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}create_origin") ET.SubElement(elt, f"{{{NS['swh']}}}origin").set("url", create_origin) if metadata_provenance_url: elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}metadata-provenance") ET.SubElement(elt, f"{{{NS['schema']}}}url").text = metadata_provenance_url if len(swh_deposit_elt): document.append(swh_deposit_elt) s = ET.tostring(document, encoding="utf-8").decode() logging.debug("Atom entry dict to generate as xml: %s", s) return s def _collection(client: PublicApiDepositClient) -> str: """Retrieve the client's collection """ # retrieve user's collection sd_content = client.service_document() if "error" in sd_content: msg = sd_content["error"] raise InputError(f"Service document retrieval: {msg}") collection = sd_content["app:service"]["app:workspace"][0]["app:collection"][ "sword:name" ] return collection def client_command_parse_input( client, username: str, archive: Optional[str], metadata: Optional[str], collection: Optional[str], slug: Optional[str], create_origin: Optional[str], metadata_provenance_url: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, name: Optional[str], authors: List[str], temp_dir: str, ) -> Dict[str, Any]: """Parse the client subcommand options and make sure the combination is acceptable*. If not, an InputError exception is raised explaining the issue. By acceptable, we mean: - A multipart deposit (create or update) requires: - an existing software archive - an existing metadata file or author(s) and name provided in params - A binary deposit (create/update) requires an existing software archive - A metadata deposit (create/update) requires an existing metadata file or author(s) and name provided in params - A deposit update requires a deposit_id This will not prevent all failure cases though. The remaining errors are already dealt with by the underlying api client. Raises: InputError explaining the user input related issue MaintenanceError explaining the api status Returns: dict with the following keys: "archive": the software archive to deposit "username": username "metadata": the metadata file to deposit "collection": the user's collection under which to put the deposit "create_origin": the origin concerned by the deposit "metadata_provenance_url": the metadata provenance url "in_progress": if the deposit is partial or not "url": deposit's server main entry point "deposit_id": optional deposit identifier "swhid": optional deposit swhid "replace": whether the given deposit is to be replaced or not """ if not metadata: if name and authors: metadata_path = os.path.join(temp_dir, "metadata.xml") logging.debug("Temporary file: %s", metadata_path) metadata_xml = generate_metadata( username, name, authors, external_id=slug, create_origin=create_origin, metadata_provenance_url=metadata_provenance_url, ) logging.debug("Metadata xml generated: %s", metadata_xml) with open(metadata_path, "w") as f: f.write(metadata_xml) metadata = metadata_path elif archive is not None and not partial and not deposit_id: # If we meet all the following conditions: # * this is not an archive-only deposit request # * it is not part of a multipart deposit (either create/update # or finish) # * it misses either name or authors raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ) elif name or authors: # If we are generating metadata, then all mandatory metadata # must be present raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ) else: # TODO: this is a multipart deposit, we might want to check that # metadata are deposited at some point pass elif name or authors or create_origin: raise InputError( "Using --metadata flag is incompatible with " "--author and --name and --create-origin (those are used to generate one " "metadata file)." ) if not archive and not metadata: raise InputError( "Please provide an actionable command. See --help for more information" ) if metadata: from xml.etree import ElementTree from swh.deposit.utils import ( parse_swh_deposit_origin, parse_swh_metadata_provenance, ) metadata_tree = ElementTree.fromstring(open(metadata).read()) (create_origin, add_to_origin) = parse_swh_deposit_origin(metadata_tree) if create_origin and add_to_origin: logger.error( "The metadata file provided must not contain both " '"" and "" tags', ) elif not create_origin and not add_to_origin: logger.warning( "The metadata file provided should contain " '"" or "" tag', ) meta_prov_url = parse_swh_metadata_provenance(metadata_tree) if not meta_prov_url: logger.warning( "The metadata file provided should contain " '"" tag' ) if replace and not deposit_id: raise InputError("To update an existing deposit, you must provide its id") if not collection: collection = _collection(client) return { "archive": archive, "username": username, "metadata": metadata, "collection": collection, "slug": slug, "in_progress": partial, "url": url, "deposit_id": deposit_id, "swhid": swhid, "replace": replace, } def _subdict(d: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]: "return a dict from d with only given keys" return {k: v for k, v in d.items() if k in keys} def credentials_decorator(f): """Add default --url, --username and --password flag to cli. """ f = click.option( "--password", required=True, help="(Mandatory) User's associated password" )(f) f = click.option("--username", required=True, help="(Mandatory) User's name")(f) f = click.option( "--url", default="https://deposit.softwareheritage.org", help=( "(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1" ), )(f) return f def output_format_decorator(f): """Add --format output flag decorator to cli. """ return click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", )(f) @deposit.command() @credentials_decorator @click.option( "--archive", type=click.Path(exists=True), help="(Optional) Software archive to deposit", ) @click.option( "--metadata", type=click.Path(exists=True), help=( "(Optional) Path to xml metadata file. If not provided, " "this will use a file named .metadata.xml" ), ) @click.option( "--archive-deposit/--no-archive-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--metadata-deposit/--no-metadata-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--collection", help="(Optional) User's collection. If not provided, this will be fetched.", ) @click.option( "--slug", help=( "(Deprecated) (Optional) External system information identifier. " "If not provided, it will be generated" ), ) @click.option( "--create-origin", help=( "(Optional) Origin url to attach information to. To be used alongside " "--name and --author. This will be generated alongside the metadata to " "provide to the deposit server." ), ) @click.option( "--metadata-provenance-url", help=( "(Optional) Provenance metadata url to indicate from where the metadata is " "coming from." ), ) @click.option( "--partial/--no-partial", default=False, help=( "(Optional) The deposit will be partial, other deposits " "will have to take place to finalize it." ), ) @click.option( "--deposit-id", default=None, help="(Optional) Update an existing partial deposit with its identifier", ) @click.option( "--swhid", default=None, help="(Optional) Update existing completed deposit (status done) with new metadata", ) @click.option( "--replace/--no-replace", default=False, help="(Optional) Update by replacing existing metadata to a deposit", ) @click.option("--verbose/--no-verbose", default=False, help="Verbose mode") @click.option("--name", help="Software name") @click.option( "--author", multiple=True, help="Software author(s), this can be repeated as many times" " as there are authors", ) @output_format_decorator @click.pass_context def upload( ctx, username: str, password: str, archive: Optional[str], metadata: Optional[str], archive_deposit: bool, metadata_deposit: bool, collection: Optional[str], slug: Optional[str], create_origin: Optional[str], metadata_provenance_url: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, verbose: bool, name: Optional[str], author: List[str], output_format: Optional[str], ): """Software Heritage Public Deposit Client Create/Update deposit through the command line. More documentation can be found at https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html. """ import tempfile from swh.deposit.client import PublicApiDepositClient if archive_deposit or metadata_deposit: warnings.warn( '"archive_deposit" and "metadata_deposit" option arguments are ' "deprecated and have no effect; simply do not provide the archive " "for a metadata-only deposit, and do not provide a metadata for a" "archive-only deposit.", DeprecationWarning, ) if slug: if create_origin and slug != create_origin: raise InputError( '"--slug" flag has been deprecated in favor of "--create-origin" flag. ' "You mentioned both with different values, please only " 'use "--create-origin".' ) warnings.warn( '"--slug" flag has been deprecated in favor of "--create-origin" flag. ' 'Please, start using "--create-origin" instead of "--slug"', DeprecationWarning, ) url = _url(url) client = PublicApiDepositClient(url=url, auth=(username, password)) with tempfile.TemporaryDirectory() as temp_dir: with trap_and_report_exceptions(): logger.debug("Parsing cli options") config = client_command_parse_input( client, username, archive, metadata, collection, slug, create_origin, metadata_provenance_url, partial, deposit_id, swhid, replace, url, name, author, temp_dir, ) if verbose: logger.info("Parsed configuration: %s", config) keys = [ "archive", "collection", "in_progress", "metadata", "slug", ] if config["deposit_id"]: keys += ["deposit_id", "replace", "swhid"] data = client.deposit_update(**_subdict(config, keys)) else: data = client.deposit_create(**_subdict(config, keys)) print_result(data, output_format) @deposit.command() @credentials_decorator @click.option("--deposit-id", default=None, required=True, help="Deposit identifier.") @output_format_decorator @click.pass_context def status(ctx, url, username, password, deposit_id, output_format): """Deposit's status """ from swh.deposit.client import PublicApiDepositClient url = _url(url) logger.debug("Status deposit") with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) print_result( client.deposit_status(collection=collection, deposit_id=deposit_id), output_format, ) def print_result(data: Dict[str, Any], output_format: Optional[str]) -> None: """Display the result data into a dedicated output format. """ import json import yaml if output_format == "json": click.echo(json.dumps(data)) elif output_format == "yaml": click.echo(yaml.dump(data)) else: logger.info(data) @deposit.command("metadata-only") @credentials_decorator @click.option( "--metadata", "metadata_path", type=click.Path(exists=True), required=True, help="Path to xml metadata file", ) @output_format_decorator @click.pass_context def metadata_only(ctx, url, username, password, metadata_path, output_format): """Deposit metadata only upload """ from xml.etree import ElementTree from swh.deposit.client import PublicApiDepositClient from swh.deposit.utils import parse_swh_metadata_provenance, parse_swh_reference # Parse to check for a swhid presence within the metadata file with open(metadata_path, "r") as f: - metadata_raw = f.read() - metadata_tree = ElementTree.fromstring(metadata_raw) + raw_metadata = f.read() + metadata_tree = ElementTree.fromstring(raw_metadata) actual_swhid = parse_swh_reference(metadata_tree) if not actual_swhid: raise InputError("A SWHID must be provided for a metadata-only deposit") meta_prov_url = parse_swh_metadata_provenance(metadata_tree) if not meta_prov_url: logger.warning( "A '' should be provided for a metadata-only " "deposit" ) with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) result = client.deposit_metadata_only(collection, metadata_path) print_result(result, output_format) @deposit.command("list") @credentials_decorator @output_format_decorator @click.option( "--page", default=1, help="Page number when requesting more information", ) @click.option( "--page-size", default=100, help="Page number when requesting more information", ) @click.pass_context def deposit_list(ctx, url, username, password, output_format, page, page_size): """Client deposit listing """ from swh.deposit.client import PublicApiDepositClient url = _url(url) logger.debug("List deposits for user %s", username) with trap_and_report_exceptions(): client = PublicApiDepositClient(url=_url(url), auth=(username, password)) collection = _collection(client) result = client.deposit_list(collection, page=page, page_size=page_size) print_result(result, output_format) diff --git a/swh/deposit/tests/api/test_deposit_private_read_metadata.py b/swh/deposit/tests/api/test_deposit_private_read_metadata.py index c2bd7bb8..1befcb10 100644 --- a/swh/deposit/tests/api/test_deposit_private_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_private_read_metadata.py @@ -1,426 +1,426 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse_lazy as reverse from rest_framework import status from swh.deposit import __version__ from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA, SE_IRI, SWH_PERSON from swh.deposit.models import Deposit PRIVATE_GET_DEPOSIT_METADATA_NC = PRIVATE_GET_DEPOSIT_METADATA + "-nc" def private_get_raw_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" deposit_id = deposit if isinstance(deposit, int) else deposit.id return [ reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[collection.name, deposit_id]), reverse(PRIVATE_GET_DEPOSIT_METADATA_NC, args=[deposit_id]), ] def update_deposit_with_metadata(authenticated_client, collection, deposit, metadata): # update deposit's metadata response = authenticated_client.post( reverse(SE_IRI, args=[collection.name, deposit.id]), content_type="application/atom+xml;type=entry", data=metadata, HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED return deposit def test_read_metadata( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Private metadata read api to existing deposit should return metadata """ deposit = partial_deposit deposit.external_id = "some-external-id" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_raw = atom_dataset["entry-data2"] deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, metadata_xml_raw, ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/some-external-id", }, - "metadata_raw": metadata_xml_raw, + "raw_metadata": metadata_xml_raw, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "author_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": "test", "id": deposit.id, "collection": "test", "revision_parents": [], "release_notes": "This is the release of October 7th, 2017.", }, } def test_read_metadata_revision_with_parent( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Private read metadata to a deposit (with parent) returns metadata """ deposit = partial_deposit deposit.external_id = "some-external-id" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_raw = atom_dataset["entry-data2"] deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, metadata_xml_raw, ) rev_id = "da78a9d4cf1d5d29873693fd496142e3a18c20fa" swhid = "swh:1:rev:%s" % rev_id fake_parent = Deposit( swhid=swhid, client=deposit.client, collection=deposit.collection ) fake_parent.save() deposit.parent = fake_parent deposit.save() for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/some-external-id", }, - "metadata_raw": metadata_xml_raw, + "raw_metadata": metadata_xml_raw, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "author_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": "test", "id": deposit.id, "collection": "test", "revision_parents": [rev_id], "release_notes": "This is the release of October 7th, 2017.", }, } def test_read_metadata_3( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """date(Created|Published) provided, uses author/committer date """ deposit = partial_deposit deposit.external_id = "hal-01243065" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_raw = atom_dataset["entry-data3"] update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, metadata_xml_raw, ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/hal-01243065", }, - "metadata_raw": metadata_xml_raw, + "raw_metadata": metadata_xml_raw, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1493820527}, }, "author_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], "release_notes": "This is the release of October 7th, 2017.", }, } def test_read_metadata_4( authenticated_client, deposit_collection, atom_dataset, partial_deposit ): """dateCreated/datePublished not provided, revision uses complete_date """ deposit = partial_deposit codemeta_entry_data = atom_dataset["metadata"] % "" deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, codemeta_entry_data ) # will use the deposit completed date as fallback date deposit.complete_date = "2016-04-06" deposit.save() for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": {"type": "deposit", "url": None,}, - "metadata_raw": codemeta_entry_data, + "raw_metadata": codemeta_entry_data, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1459900800}, }, "author_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1459900800}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], "release_notes": None, }, } def test_read_metadata_5( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """dateCreated/datePublished provided, revision uses author/committer date If multiple dateCreated provided, the first occurrence (of dateCreated) is selected. If multiple datePublished provided, the first occurrence (of datePublished) is selected. """ deposit = partial_deposit # add metadata to the deposit with multiple datePublished/dateCreated codemeta_entry_data = ( atom_dataset["metadata"] % """ 2015-04-06T17:08:47+02:00 2017-05-03T16:08:47+02:00 2016-04-06T17:08:47+02:00 2018-05-03T16:08:47+02:00 """ ) deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, codemeta_entry_data ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/hal-01243065", }, - "metadata_raw": codemeta_entry_data, + "raw_metadata": codemeta_entry_data, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1493820527}, }, "author_date": { "offset": 120, "timestamp": {"microseconds": 0, "seconds": 1428332927}, }, "client": deposit_collection.name, "id": deposit.id, "collection": deposit_collection.name, "revision_parents": [], "release_notes": None, }, } def test_access_to_nonexisting_deposit_returns_404_response( authenticated_client, deposit_collection, ): """Read unknown collection should return a 404 response """ unknown_id = 999 try: Deposit.objects.get(pk=unknown_id) except Deposit.DoesNotExist: assert True for url in private_get_raw_url_endpoints(deposit_collection, unknown_id): response = authenticated_client.get(url) assert response.status_code == status.HTTP_404_NOT_FOUND msg = "Deposit %s does not exist" % unknown_id assert msg in response.content.decode("utf-8") def test_read_metadata_multiple_release_notes( authenticated_client, deposit_collection, partial_deposit, atom_dataset ): """Private metadata read api to existing deposit should return metadata """ deposit = partial_deposit deposit.external_id = "some-external-id" deposit.origin_url = f"https://hal-test.archives-ouvertes.fr/{deposit.external_id}" deposit.save() metadata_xml_raw = atom_dataset["entry-data-multiple-release-notes"] deposit = update_deposit_with_metadata( authenticated_client, deposit_collection, deposit, metadata_xml_raw, ) for url in private_get_raw_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK assert response["content-type"] == "application/json" actual_data = response.json() assert actual_data == { "origin": { "type": "deposit", "url": "https://hal-test.archives-ouvertes.fr/some-external-id", }, - "metadata_raw": metadata_xml_raw, + "raw_metadata": metadata_xml_raw, "provider": { "metadata": {}, "provider_name": "", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", }, "tool": { "configuration": {"sword_version": "2"}, "name": "swh-deposit", "version": __version__, }, "deposit": { "author": SWH_PERSON, "committer": SWH_PERSON, "committer_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "author_date": { "offset": 0, "timestamp": {"microseconds": 0, "seconds": 1507389428}, }, "client": "test", "id": deposit.id, "collection": "test", "revision_parents": [], "release_notes": ( "This is the release of October 7th, 2017.\n\n" "It fixes some bugs." ), }, } diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py index 185fe87a..fd753b7d 100644 --- a/swh/deposit/tests/cli/test_client.py +++ b/swh/deposit/tests/cli/test_client.py @@ -1,1176 +1,1176 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import contextlib import json import logging import os from typing import Optional from unittest.mock import MagicMock from xml.etree import ElementTree import pytest import yaml from swh.deposit.api.checks import ( METADATA_PROVENANCE_KEY, SUGGESTED_FIELDS_MISSING, check_metadata, ) from swh.deposit.cli import deposit as cli from swh.deposit.cli.client import InputError, _collection, _url, generate_metadata from swh.deposit.client import ( BaseDepositClient, MaintenanceError, PublicApiDepositClient, ServiceDocumentDepositClient, ) from swh.deposit.parsers import parse_xml from swh.deposit.utils import NAMESPACES from swh.model.exceptions import ValidationError from ..conftest import TEST_USER def generate_slug() -> str: """Generate a slug (sample purposes). """ import uuid return str(uuid.uuid4()) @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def slug(): return generate_slug() @pytest.fixture def patched_tmp_path(tmp_path, mocker): mocker.patch( "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) return tmp_path @pytest.fixture def client_mock_api_down(mocker, slug): """A mock client whose connection with api fails due to maintenance issue """ mock_client = MagicMock() mocker.patch("swh.deposit.client.PublicApiDepositClient", return_value=mock_client) mock_client.service_document.side_effect = MaintenanceError( "Database backend maintenance: Temporarily unavailable, try again later." ) return mock_client def test_cli_url(): assert _url("http://deposit") == "http://deposit/1" assert _url("https://other/1") == "https://other/1" def test_cli_collection_error(): mock_client = MagicMock() mock_client.service_document.return_value = {"error": "something went wrong"} with pytest.raises(InputError) as e: _collection(mock_client) assert "Service document retrieval: something went wrong" == str(e.value) def test_cli_collection_ok(requests_mock_datadir): client = PublicApiDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) collection_name = _collection(client) assert collection_name == "test" def test_cli_collection_ko_because_downtime(): mock_client = MagicMock() mock_client.service_document.side_effect = MaintenanceError("downtime") with pytest.raises(MaintenanceError, match="downtime"): _collection(mock_client) def test_cli_upload_conflictual_flags( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" metadata = atom_dataset["entry-data-minimal"] metadata_path = os.path.join(tmp_path, "entry-data-minimal.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(InputError, match="both with different values"): # fmt: off cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--slug", "some-slug", # deprecated flag "--create-origin", "some-other-slug", # conflictual value, so raise "--format", "json", ], catch_exceptions=False, ) # fmt: on def test_cli_deposit_with_server_down_for_maintenance( sample_archive, caplog, client_mock_api_down, slug, patched_tmp_path, cli_runner ): """ Deposit failure due to maintenance down time should be explicit """ # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" down_for_maintenance_log_record = ( "swh.deposit.cli.client", logging.ERROR, "Database backend maintenance: Temporarily unavailable, try again later.", ) assert down_for_maintenance_log_record in caplog.record_tuples client_mock_api_down.service_document.assert_called_once_with() def test_cli_client_generate_metadata_ok(slug): """Generated metadata is well formed and pass service side metadata checks """ actual_metadata_xml = generate_metadata( "deposit-client", "project-name", authors=["some", "authors"], external_id="http://example.org/external-id", create_origin="origin-url", metadata_provenance_url="meta-prov-url", ) actual_metadata = parse_xml(actual_metadata_xml) assert ( actual_metadata.findtext("atom:author", namespaces=NAMESPACES) == "deposit-client" ) assert ( actual_metadata.findtext("atom:title", namespaces=NAMESPACES) == "project-name" ) assert actual_metadata.findtext("atom:updated", namespaces=NAMESPACES) is not None assert ( actual_metadata.findtext("codemeta:name", namespaces=NAMESPACES) == "project-name" ) assert ( actual_metadata.findtext("codemeta:identifier", namespaces=NAMESPACES) == "http://example.org/external-id" ) authors = actual_metadata.findall( "codemeta:author/codemeta:name", namespaces=NAMESPACES ) assert len(authors) == 2 assert authors[0].text == "some" assert authors[1].text == "authors" assert ( actual_metadata.find( "swh:deposit/swh:create_origin/swh:origin", namespaces=NAMESPACES ).attrib["url"] == "origin-url" ) assert ( actual_metadata.findtext( "swh:deposit/swh:metadata-provenance/schema:url", namespaces=NAMESPACES ) == "meta-prov-url" ) checks_ok, detail = check_metadata(ElementTree.fromstring(actual_metadata_xml)) assert checks_ok is True assert detail is None def test_cli_client_generate_metadata_ok2(slug): """Generated metadata is well formed and pass service side metadata checks """ actual_metadata_xml = generate_metadata( "deposit-client", "project-name", authors=["some", "authors"], ) actual_metadata = parse_xml(actual_metadata_xml) assert ( actual_metadata.findtext("atom:author", namespaces=NAMESPACES) == "deposit-client" ) assert ( actual_metadata.findtext("atom:title", namespaces=NAMESPACES) == "project-name" ) assert actual_metadata.findtext("atom:updated", namespaces=NAMESPACES) is not None assert ( actual_metadata.findtext("codemeta:name", namespaces=NAMESPACES) == "project-name" ) authors = actual_metadata.findall( "codemeta:author/codemeta:name", namespaces=NAMESPACES ) assert len(authors) == 2 assert authors[0].text == "some" assert authors[1].text == "authors" assert actual_metadata.find("codemeta:identifier", namespaces=NAMESPACES) is None assert actual_metadata.find("swh:deposit", namespaces=NAMESPACES) is None checks_ok, detail = check_metadata(ElementTree.fromstring(actual_metadata_xml)) assert checks_ok is True assert detail == { "metadata": [ {"summary": SUGGESTED_FIELDS_MISSING, "fields": [METADATA_PROVENANCE_KEY]} ] } def test_cli_single_minimal_deposit_with_slug( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner, caplog, ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--metadata-provenance-url", "meta-prov-url", "--author", "Jane Doe", "--slug", slug, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "2020-10-08T13:52:34.509655Z", } with open(metadata_path) as fd: actual_metadata = parse_xml(fd.read()) assert ( actual_metadata.findtext("atom:author", namespaces=NAMESPACES) == TEST_USER["username"] ) assert ( actual_metadata.findtext("codemeta:name", namespaces=NAMESPACES) == "test-project" ) assert ( actual_metadata.findtext("atom:title", namespaces=NAMESPACES) == "test-project" ) assert ( actual_metadata.findtext("atom:updated", namespaces=NAMESPACES) is not None ) assert ( actual_metadata.findtext("codemeta:identifier", namespaces=NAMESPACES) == slug ) authors = actual_metadata.findall( "codemeta:author/codemeta:name", namespaces=NAMESPACES ) assert len(authors) == 1 assert authors[0].text == "Jane Doe" count_warnings = 0 for (_, log_level, _) in caplog.record_tuples: count_warnings += 1 if log_level == logging.WARNING else 0 assert ( count_warnings == 1 ), "We should have 1 warning as we are using slug instead of create_origin" def test_cli_single_minimal_deposit_with_create_origin( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner, caplog, ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") origin = slug # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--create-origin", origin, "--metadata-provenance-url", "meta-prov-url", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "2020-10-08T13:52:34.509655Z", } with open(metadata_path) as fd: actual_metadata = parse_xml(fd.read()) assert ( actual_metadata.findtext("atom:author", namespaces=NAMESPACES) == TEST_USER["username"] ) assert ( actual_metadata.findtext("codemeta:name", namespaces=NAMESPACES) == "test-project" ) assert ( actual_metadata.findtext("atom:title", namespaces=NAMESPACES) == "test-project" ) assert ( actual_metadata.findtext("atom:updated", namespaces=NAMESPACES) is not None ) assert ( actual_metadata.find( "swh:deposit/swh:create_origin/swh:origin", namespaces=NAMESPACES ).attrib["url"] == origin ) assert ( actual_metadata.findtext( "swh:deposit/swh:metadata-provenance/schema:url", namespaces=NAMESPACES ) == "meta-prov-url" ) authors = actual_metadata.findall( "codemeta:author/codemeta:name", namespaces=NAMESPACES ) assert len(authors) == 1 assert authors[0].text == "Jane Doe" count_warnings = 0 for (_, log_level, _) in caplog.record_tuples: count_warnings += 1 if log_level == logging.WARNING else 0 assert ( count_warnings == 0 ), "We should have no warning as we are using create_origin" def test_cli_validation_metadata( sample_archive, caplog, patched_tmp_path, cli_runner, slug ): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") with open(metadata_path, "a"): pass # creates the file for flag_title_or_name, author_or_name in [ ("--author", "no one"), ("--name", "test-project"), ]: # Test missing author then missing name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, flag_title_or_name, author_or_name, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags: Test both --metadata and --author, then --metadata and # --name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--deposit-id", 666, "--archive", sample_archive["path"], "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags check (Test both --metadata and --author, # then --metadata and --name) # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--metadata", metadata_path, "--author", "Jane Doe", "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Using --metadata flag is incompatible with --author " "and --name and --create-origin (those are used to generate " "one metadata file)." ), ) assert expected_error_log_record in caplog.record_tuples caplog.clear() def test_cli_validation_no_actionable_command(caplog, cli_runner): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ # no actionable command # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--partial", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Please provide an actionable command. See --help for more information" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_validation_replace_with_no_deposit_id_fails( sample_archive, caplog, patched_tmp_path, requests_mock_datadir, datadir, cli_runner ): """--replace flags require --deposit-id otherwise fails """ metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--archive", sample_archive["path"], "--replace", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "To update an existing deposit, you must provide its id" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_single_deposit_slug_generation( sample_archive, patched_tmp_path, requests_mock_datadir, cli_runner ): """Single deposit scenario without providing the slug, it should not be generated. """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "2020-10-08T13:52:34.509655Z", } with open(metadata_path) as fd: metadata_xml = fd.read() actual_metadata = parse_xml(metadata_xml) assert "codemeta:identifier" not in actual_metadata def test_cli_multisteps_deposit( sample_archive, datadir, slug, requests_mock_datadir, cli_runner ): """ First deposit a partial deposit (no metadata, only archive), then update the metadata part. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit """ # noqa api_url = "https://deposit.test.metadata/1" deposit_id = 666 # Create a partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, "--format", "json", "--partial", ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" actual_deposit = json.loads(result.output) assert actual_deposit == { "deposit_id": str(deposit_id), "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "2020-10-08T13:52:34.509655Z", } # Update the partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--deposit-id", deposit_id, "--slug", slug, "--format", "json", "--partial", # in-progress: True, because remains the metadata to upload ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) assert actual_deposit["deposit_status"] == "partial" # Update the partial deposit with only some metadata (and then finalize it) # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # Update deposit with metadata # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--deposit-id", deposit_id, "--slug", slug, "--format", "json", ], # this time, ^ we no longer flag it to partial, so the status changes to # in-progress false ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) # FIXME: should be "deposited" but current limitation in the # requests_mock_datadir_visits use, cannot find a way to make it work right now assert actual_deposit["deposit_status"] == "partial" @pytest.mark.parametrize( "output_format,parser_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_status_with_output_format( output_format, parser_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.status" deposit_id = 1033 expected_deposit_status = { "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", } # fmt: off result = cli_runner.invoke( cli, [ "status", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--deposit-id", deposit_id, "--format", output_format, ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = parser_fn(result_output) assert actual_deposit == expected_deposit_status def test_cli_update_metadata_with_swhid_on_completed_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata on a completed deposit (status done) is ok """ api_url_basename = "deposit.test.updateswhid" deposit_id = 123 expected_deposit_status = { "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa } assert expected_deposit_status["deposit_status"] == "done" assert expected_deposit_status["deposit_swh_id"] is not None # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", expected_deposit_status["deposit_swh_id"], "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status def test_cli_update_metadata_with_swhid_on_other_status_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata with swhid on other deposit status is not possible """ api_url_basename = "deposit.test.updateswhid" deposit_id = "321" # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_result = json.loads(result.output) assert "error" in actual_result assert actual_result == { "error": "You can only update metadata on deposit with status 'done'", "detail": f"The deposit {deposit_id} has status 'partial'", "deposit_status": "partial", "deposit_id": deposit_id, } @pytest.mark.parametrize( "metadata_entry_key", ["entry-data-with-swhid", "entry-data-with-swhid-no-prov"] ) def test_cli_metadata_only_deposit_full_metadata_file( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, metadata_entry_key, caplog, ): """Post metadata-only deposit through cli The metadata file posted by the client already contains the swhid """ api_url_basename = "deposit.test.metadataonly" swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" atom_data = atom_dataset[metadata_entry_key] if metadata_entry_key == "entry-data-with-swhid": metadata = atom_data.format( swhid=swhid, metadata_provenance_url=( "https://inria.halpreprod.archives-ouvertes.fr/hal-abcdefgh" ), ) else: metadata = atom_data.format(swhid=swhid) metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") with open(metadata_path, "w") as m: m.write(metadata) expected_deposit_status = { "deposit_id": "100", "deposit_status": "done", "deposit_date": "2020-10-08T13:52:34.509655Z", } assert expected_deposit_status["deposit_status"] == "done" # fmt: off result = cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status count_warnings = 0 warning_record: Optional[str] = None for (_, log_level, msg) in caplog.record_tuples: if log_level == logging.WARNING: count_warnings += 1 warning_record = msg if "no-prov" in metadata_entry_key: assert count_warnings == 1 assert "metadata-provenance>' should be provided" in warning_record else: assert count_warnings == 0 def test_cli_metadata_only_deposit_invalid_swhid( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" invalid_swhid = "ssh:2:sth:xxx" metadata = atom_dataset["entry-data-with-swhid-no-prov"].format(swhid=invalid_swhid) metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(ValidationError, match="Invalid"): # fmt: off cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], catch_exceptions=False, ) # fmt: on def test_cli_metadata_only_deposit_no_swhid( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" metadata = atom_dataset["entry-data-minimal"] metadata_path = os.path.join(tmp_path, "entry-data-minimal.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(InputError, match="SWHID must be provided"): # fmt: off cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], catch_exceptions=False, ) # fmt: on @pytest.mark.parametrize( "metadata_entry_key", ["entry-data-with-add-to-origin", "entry-only-create-origin"] ) def test_cli_deposit_warning_missing_origin( metadata_entry_key, tmp_path, atom_dataset, caplog, cli_runner, requests_mock_datadir, ): """Deposit cli should warn when provided metadata xml is missing 'origins' tags """ # For the next deposit, no warning should be logged as either or # are provided, and is always # provided. - metadata_raw = atom_dataset[metadata_entry_key] % "some-url" + raw_metadata = atom_dataset[metadata_entry_key] % "some-url" metadata_path = os.path.join(tmp_path, "metadata-with-origin-tag-to-deposit.xml") with open(metadata_path, "w") as f: - f.write(metadata_raw) + f.write(raw_metadata) # fmt: off cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, ], ) # fmt: on for (_, log_level, _) in caplog.record_tuples: # all messages are info or below messages so everything is fine assert log_level < logging.WARNING def test_cli_deposit_warning_missing_provenance_url( tmp_path, atom_dataset, caplog, cli_runner, requests_mock_datadir, ): """Deposit cli should warn when no metadata provenance is provided """ atom_template = atom_dataset["entry-data-with-add-to-origin-no-prov"] - metadata_raw = atom_template % "some-url" + raw_metadata = atom_template % "some-url" metadata_path = os.path.join(tmp_path, "metadata-with-missing-prov-url.xml") with open(metadata_path, "w") as f: - f.write(metadata_raw) + f.write(raw_metadata) # fmt: off cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, ], ) # fmt: on count_warnings = sum( 1 for (_, log_level, _) in caplog.record_tuples if log_level == logging.WARNING ) assert count_warnings == 1 def test_cli_failure_should_be_parseable(atom_dataset, mocker): summary = "Cannot load metadata" verbose_description = ( "Cannot load metadata on swh:1:dir:0eda267e7d3c2e37b3f6a78e542b16190ac4574e, " "this directory object does not exist in the archive (yet?)." ) error_xml = atom_dataset["error-cli"].format( summary=summary, verboseDescription=verbose_description ) api_call = BaseDepositClient(url="https://somewhere.org/") actual_error = api_call.parse_result_error(error_xml) assert actual_error == { "summary": summary, "detail": "", "sword:verboseDescription": verbose_description, } def test_cli_service_document_failure(atom_dataset, mocker): """Ensure service document failures are properly served """ summary = "Invalid user credentials" error_xml = atom_dataset["error-cli"].format(summary=summary, verboseDescription="") api_call = ServiceDocumentDepositClient(url="https://somewhere.org/") actual_error = api_call.parse_result_error(error_xml) assert actual_error == {"error": summary} @pytest.mark.parametrize( "output_format,parser_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_collection_list( output_format, parser_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.list" expected_deposits = { "count": "3", "deposits": [ { "external_id": "check-deposit-2020-10-09T13:10:00.000000", "id": "1031", "status": "rejected", "status_detail": "Deposit without archive", }, { "external_id": "check-deposit-2020-10-10T13:20:00.000000", "id": "1032", "status": "rejected", "status_detail": "Deposit without archive", }, { "complete_date": "2020-10-08T13:52:34.509655", "external_id": "check-deposit-2020-10-08T13:52:34.509655", "id": "1033", "reception_date": "2020-10-08T13:50:30", "status": "done", "status_detail": "The deposit has been successfully loaded into " "the Software Heritage archive", "swhid": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "swhid_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa }, ], } # fmt: off result = cli_runner.invoke( cli, [ "list", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--page", 1, "--page-size", 10, "--format", output_format, ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = parser_fn(result_output) assert actual_deposit == expected_deposits