diff --git a/swh/deposit/cli/admin.py b/swh/deposit/cli/admin.py index 17446bf9..2ffcfb0b 100644 --- a/swh/deposit/cli/admin.py +++ b/swh/deposit/cli/admin.py @@ -1,275 +1,284 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +from __future__ import annotations + +from typing import TYPE_CHECKING + import click from swh.deposit.cli import deposit +if TYPE_CHECKING: + from swh.deposit.models import DepositCollection + @deposit.group("admin") @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Optional extra configuration file.", ) @click.option( "--platform", default="development", type=click.Choice(["development", "production"]), help="development or production platform", ) @click.pass_context -def admin(ctx, config_file, platform): +def admin(ctx, config_file: str, platform: str): """Server administration tasks (manipulate user or collections)""" from swh.deposit.config import setup_django_for # configuration happens here setup_django_for(platform, config_file=config_file) @admin.group("user") @click.pass_context def user(ctx): """Manipulate user.""" # configuration happens here pass -def _create_collection(name): +def _create_collection(name: str) -> DepositCollection: """Create the collection with name if it does not exist. Args: - name (str): collection's name + name: collection name Returns: - collection (DepositCollection): the existing collection object - (created or not) + collection: the existing collection object """ # to avoid loading too early django namespaces from swh.deposit.models import DepositCollection try: collection = DepositCollection.objects.get(name=name) - click.echo("Collection %s exists, nothing to do." % name) + click.echo(f"Collection '{name}' exists, skipping.") except DepositCollection.DoesNotExist: - click.echo("Create new collection %s" % name) + click.echo(f"Create collection '{name}'.") collection = DepositCollection.objects.create(name=name) - click.echo("Collection %s created" % name) + click.echo(f"Collection '{name}' created.") return collection @user.command("create") @click.option("--username", required=True, help="User's name") @click.option("--password", required=True, help="Desired user's password (plain).") @click.option("--firstname", default="", help="User's first name") @click.option("--lastname", default="", help="User's last name") @click.option("--email", default="", help="User's email") @click.option("--collection", help="User's collection") @click.option("--provider-url", default="", help="Provider URL") @click.option("--domain", default="", help="The domain") @click.pass_context def user_create( ctx, - username, - password, - firstname, - lastname, - email, - collection, - provider_url, - domain, + username: str, + password: str, + firstname: str, + lastname: str, + email: str, + collection: str, + provider_url: str, + domain: str, ): """Create a user with some needed information (password, collection) If the collection does not exist, the collection is then created alongside. The password is stored encrypted using django's utilities. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient # If collection is not provided, fallback to username if not collection: collection = username - click.echo("collection: %s" % collection) # create the collection if it does not exist - collection = _create_collection(collection) + collection_ = _create_collection(collection) # user create/update try: - user = DepositClient.objects.get(username=username) - click.echo("User %s exists, updating information." % user) + user = DepositClient.objects.get(username=username) # type: ignore + click.echo(f"Update user '{username}'.") user.set_password(password) + action_done = "updated" except DepositClient.DoesNotExist: - click.echo("Create new user %s" % username) - user = DepositClient.objects.create_user(username=username, password=password) + click.echo(f"Create user '{username}'.") + user = DepositClient.objects.create_user( # type: ignore + username=username, password=password + ) + action_done = "created" - user.collections = [collection.id] + user.collections = [collection_.id] user.first_name = firstname user.last_name = lastname user.email = email user.is_active = True user.provider_url = provider_url user.domain = domain user.save() - click.echo("Information registered for user %s" % user) + click.echo(f"User '{username}' {action_done}.") @user.command("list") @click.pass_context def user_list(ctx): """List existing users. This entrypoint is not paginated yet as there is not a lot of entry. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient users = DepositClient.objects.all() if not users: output = "Empty user list" else: output = "\n".join((user.username for user in users)) click.echo(output) @user.command("exists") @click.argument("username", required=True) @click.pass_context -def user_exists(ctx, username): +def user_exists(ctx, username: str): """Check if user exists. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient try: - DepositClient.objects.get(username=username) - click.echo("User %s exists." % username) + DepositClient.objects.get(username=username) # type: ignore + click.echo(f"User {username} exists.") ctx.exit(0) except DepositClient.DoesNotExist: - click.echo("User %s does not exist." % username) + click.echo(f"User {username} does not exist.") ctx.exit(1) @admin.group("collection") @click.pass_context def collection(ctx): """Manipulate collections.""" pass @collection.command("create") @click.option("--name", required=True, help="Collection's name") @click.pass_context def collection_create(ctx, name): _create_collection(name) @collection.command("list") @click.pass_context def collection_list(ctx): """List existing collections. This entrypoint is not paginated yet as there is not a lot of entry. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositCollection collections = DepositCollection.objects.all() if not collections: output = "Empty collection list" else: output = "\n".join((col.name for col in collections)) click.echo(output) @admin.group("deposit") @click.pass_context def adm_deposit(ctx): """Manipulate deposit.""" pass @adm_deposit.command("reschedule") @click.option("--deposit-id", required=True, help="Deposit identifier") @click.pass_context def adm_deposit_reschedule(ctx, deposit_id): """Reschedule the deposit loading This will: - check the deposit's status to something reasonable (failed or done). That means that the checks have passed alright but something went wrong during the loading (failed: loading failed, done: loading ok, still for some reasons as in bugs, we need to reschedule it) - reset the deposit's status to 'verified' (prior to any loading but after the checks which are fine) and removes the different archives' identifiers (swh-id, ...) - trigger back the loading task through the scheduler """ # to avoid loading too early django namespaces from datetime import datetime from swh.deposit.config import ( DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_VERIFIED, APIConfig, ) from swh.deposit.models import Deposit try: deposit = Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: click.echo("Deposit %s does not exist." % deposit_id) ctx.exit(1) # Check the deposit is in a reasonable state accepted_statuses = [DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE] if deposit.status == DEPOSIT_STATUS_VERIFIED: click.echo("Deposit %s's status already set for rescheduling." % (deposit_id)) ctx.exit(0) if deposit.status not in accepted_statuses: click.echo( "Deposit %s's status be one of %s." % (deposit_id, ", ".join(accepted_statuses)) ) ctx.exit(1) task_id = deposit.load_task_id if not task_id: click.echo( "Deposit %s cannot be rescheduled. It misses the " "associated task." % deposit_id ) ctx.exit(1) # Reset the deposit's state deposit.swhid = None deposit.swhid_context = None deposit.status = DEPOSIT_STATUS_VERIFIED deposit.save() # Trigger back the deposit scheduler = APIConfig().scheduler scheduler.set_status_tasks( [task_id], status="next_run_not_scheduled", next_run=datetime.now() ) diff --git a/swh/deposit/tests/cli/conftest.py b/swh/deposit/tests/cli/conftest.py new file mode 100644 index 00000000..aa23c215 --- /dev/null +++ b/swh/deposit/tests/cli/conftest.py @@ -0,0 +1,12 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from click.testing import CliRunner +import pytest + + +@pytest.fixture +def cli_runner(): + return CliRunner() diff --git a/swh/deposit/tests/cli/test_admin.py b/swh/deposit/tests/cli/test_admin.py new file mode 100644 index 00000000..73262aab --- /dev/null +++ b/swh/deposit/tests/cli/test_admin.py @@ -0,0 +1,189 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.deposit.cli.admin import admin as cli +from swh.deposit.models import DepositClient, DepositCollection + + +@pytest.fixture(autouse=True) +def enable_db_access_for_all_tests(db): + pass + + +def test_cli_admin_user_list_nothing(cli_runner): + result = cli_runner.invoke(cli, ["user", "list",]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + assert result.output == "Empty user list\n" + + +def test_cli_admin_user_list_with_users(cli_runner, deposit_user): + result = cli_runner.invoke(cli, ["user", "list",]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + assert result.output == f"{deposit_user.username}\n" # only 1 user + + +def test_cli_admin_collection_list_nothing(cli_runner): + result = cli_runner.invoke(cli, ["collection", "list",]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + assert result.output == "Empty collection list\n" + + +def test_cli_admin_collection_list_with_collections(cli_runner, deposit_collection): + from swh.deposit.tests.conftest import create_deposit_collection + + new_collection = create_deposit_collection("something") + + result = cli_runner.invoke(cli, ["collection", "list",]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + collections = "\n".join([deposit_collection.name, new_collection.name]) + assert result.output == f"{collections}\n" + + +def test_cli_admin_user_exists_unknown(cli_runner): + result = cli_runner.invoke(cli, ["user", "exists", "unknown"]) + + assert result.exit_code == 1, f"Unexpected output: {result.output}" + assert result.output == "User unknown does not exist.\n" + + +def test_cli_admin_user_exists(cli_runner, deposit_user): + result = cli_runner.invoke(cli, ["user", "exists", deposit_user.username]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + assert result.output == f"User {deposit_user.username} exists.\n" + + +def test_cli_admin_create_collection(cli_runner): + collection_name = "something" + + try: + DepositCollection.objects.get(name=collection_name) + except DepositCollection.DoesNotExist: + pass + + result = cli_runner.invoke( + cli, ["collection", "create", "--name", collection_name,] + ) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + collection = DepositCollection.objects.get(name=collection_name) + assert collection is not None + + assert ( + result.output + == f"""Create collection '{collection_name}'. +Collection '{collection_name}' created. +""" + ) + + result2 = cli_runner.invoke( + cli, ["collection", "create", "--name", collection_name,] + ) + assert result2.exit_code == 0, f"Unexpected output: {result.output}" + assert ( + result2.output + == f"""Collection '{collection_name}' exists, skipping. +""" + ) + + +def test_cli_admin_user_create(cli_runner): + user_name = "user" + collection_name = user_name + + try: + DepositClient.objects.get(username=user_name) + except DepositClient.DoesNotExist: + pass + + try: + DepositCollection.objects.get(name=collection_name) + except DepositCollection.DoesNotExist: + pass + + result = cli_runner.invoke( + cli, ["user", "create", "--username", user_name, "--password", "password",] + ) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + user = DepositClient.objects.get(username=user_name) + assert user is not None + collection = DepositCollection.objects.get(name=collection_name) + assert collection is not None + + assert ( + result.output + == f"""Create collection '{user_name}'. +Collection '{collection_name}' created. +Create user '{user_name}'. +User '{user_name}' created. +""" + ) + + assert collection.name == collection_name + assert user.username == user_name + first_password = user.password + assert first_password is not None + assert user.collections == [collection.id] + assert user.is_active is True + assert user.domain == "" + assert user.provider_url == "" + assert user.email == "" + assert user.first_name == "" + assert user.last_name == "" + + # create a user that already exists + result2 = cli_runner.invoke( + cli, + [ + "user", + "create", + "--username", + "user", + "--password", + "another-password", # changing password + "--collection", + collection_name, # specifying the collection this time + "--firstname", + "User", + "--lastname", + "no one", + "--email", + "user@org.org", + "--provider-url", + "http://some-provider.org", + "--domain", + "domain", + ], + ) + + assert result2.exit_code == 0, f"Unexpected output: {result2.output}" + user = DepositClient.objects.get(username=user_name) + assert user is not None + + assert user.username == user_name + assert user.collections == [collection.id] + assert user.is_active is True + second_password = user.password + assert second_password is not None + assert second_password != first_password, "Password should have changed" + assert user.domain == "domain" + assert user.provider_url == "http://some-provider.org" + assert user.email == "user@org.org" + assert user.first_name == "User" + assert user.last_name == "no one" + + assert ( + result2.output + == f"""Collection '{collection_name}' exists, skipping. +Update user '{user_name}'. +User '{user_name}' updated. +""" + ) diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py index b5064e1d..2d879b3a 100644 --- a/swh/deposit/tests/cli/test_client.py +++ b/swh/deposit/tests/cli/test_client.py @@ -1,723 +1,717 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import contextlib import json import logging import os from unittest.mock import MagicMock -from click.testing import CliRunner import pytest import yaml from swh.deposit.cli import deposit as cli from swh.deposit.cli.client import InputError, _client, _collection, _url, generate_slug from swh.deposit.client import MaintenanceError, PublicApiDepositClient from swh.deposit.parsers import parse_xml from ..conftest import TEST_USER @pytest.fixture def deposit_config(): return { "url": "https://deposit.swh.test/1", "auth": {"username": "test", "password": "test",}, } @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def slug(): return generate_slug() @pytest.fixture def patched_tmp_path(tmp_path, mocker): mocker.patch( "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) return tmp_path -@pytest.fixture -def cli_runner(): - return CliRunner() - - @pytest.fixture def client_mock_api_down(mocker, slug): """A mock client whose connection with api fails due to maintenance issue """ mock_client = MagicMock() mocker.patch("swh.deposit.cli.client._client", return_value=mock_client) mock_client.service_document.side_effect = MaintenanceError( "Database backend maintenance: Temporarily unavailable, try again later." ) return mock_client def test_cli_url(): assert _url("http://deposit") == "http://deposit/1" assert _url("https://other/1") == "https://other/1" def test_cli_client(): client = _client("http://deposit", "user", "pass") assert isinstance(client, PublicApiDepositClient) def test_cli_collection_error(): mock_client = MagicMock() mock_client.service_document.return_value = {"error": "something went wrong"} with pytest.raises(InputError) as e: _collection(mock_client) assert "Service document retrieval: something went wrong" == str(e.value) def test_cli_collection_ok(deposit_config, requests_mock_datadir): client = PublicApiDepositClient(deposit_config) collection_name = _collection(client) assert collection_name == "test" def test_cli_collection_ko_because_downtime(): mock_client = MagicMock() mock_client.service_document.side_effect = MaintenanceError("downtime") with pytest.raises(MaintenanceError, match="downtime"): _collection(mock_client) def test_cli_deposit_with_server_down_for_maintenance( sample_archive, caplog, client_mock_api_down, slug, patched_tmp_path, cli_runner ): """ Deposit failure due to maintenance down time should be explicit """ result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) assert result.exit_code == 1, result.output assert result.output == "" down_for_maintenance_log_record = ( "swh.deposit.cli.client", logging.ERROR, "Database backend maintenance: Temporarily unavailable, try again later.", ) assert down_for_maintenance_log_record in caplog.record_tuples client_mock_api_down.service_document.assert_called_once_with() def test_cli_single_minimal_deposit( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--slug", slug, "--format", "json", ], ) assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: assert ( fd.read() == f"""\ \ttest-project \t{slug} \t \t\tJane Doe \t """ ) def test_cli_validation_metadata( sample_archive, caplog, patched_tmp_path, cli_runner, slug ): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") with open(metadata_path, "a"): pass # creates the file for flag_title_or_name, author_or_name in [ ("--author", "no one"), ("--name", "test-project"), ]: # Test missing author then missing name result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, flag_title_or_name, author_or_name, ], ) assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " "If this is an archive deposit request, none is required." ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags: Test both --metadata and --author, then --metadata and # --name result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--deposit-id", 666, "--archive", sample_archive["path"], "--slug", slug, ], ) assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags check (Test both --metadata and --author, # then --metadata and --name) result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--metadata", metadata_path, "--author", "Jane Doe", "--slug", slug, ], ) assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Using --metadata flag is incompatible with both " "--author and --name (Those are used to generate one metadata file)." ), ) assert expected_error_log_record in caplog.record_tuples caplog.clear() def test_cli_validation_no_actionable_command(caplog, cli_runner): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ # no actionable command result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--partial", ], ) assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Please provide an actionable command. See --help for more information" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_validation_missing_metadata_flag(caplog, cli_runner): """--metadata-deposit requires --metadata (or --name and --author) otherwise fails """ # --metadata-deposit without --metadata flag fails result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata-deposit", # should fail because missing --metadata flag ], ) assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Metadata deposit must be provided for metadata " "deposit, either a filepath with --metadata or --name and --author" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_validation_replace_with_no_deposit_id_fails( sample_archive, caplog, patched_tmp_path, requests_mock_datadir, datadir, cli_runner ): """--replace flags require --deposit-id otherwise fails """ metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") for flags in [ ["--replace"], ["--replace", "--metadata-deposit", "--archive-deposit"], ]: result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--archive", sample_archive["path"], ] + flags, ) assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "To update an existing deposit, you must provide its id" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_single_deposit_slug_generation( sample_archive, patched_tmp_path, requests_mock_datadir, cli_runner ): """Single deposit scenario without providing the slug, the slug is generated nonetheless https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--format", "json", ], ) assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: metadata_xml = fd.read() actual_metadata = parse_xml(metadata_xml) assert actual_metadata["codemeta:identifier"] is not None def test_cli_multisteps_deposit( sample_archive, datadir, slug, requests_mock_datadir, cli_runner ): """ First deposit a partial deposit (no metadata, only archive), then update the metadata part. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit """ # noqa api_url = "https://deposit.test.metadata/1" deposit_id = 666 # Create a partial deposit with only 1 archive result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--partial", "--slug", slug, "--format", "json", ], ) assert result.exit_code == 0, f"unexpected output: {result.output}" actual_deposit = json.loads(result.output) assert actual_deposit == { "deposit_id": str(deposit_id), "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } # Update the partial deposit with only 1 archive result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--deposit-id", deposit_id, "--partial", # in-progress: True, because remains the metadata to upload "--slug", slug, "--format", "json", ], ) assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) assert actual_deposit["deposit_status"] == "partial" # Update the partial deposit with only some metadata (and then finalize it) # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # Update deposit with metadata result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--deposit-id", deposit_id, "--slug", slug, "--format", "json", ], # this time, ^ we no longer flag it to partial, so the status changes to # in-progress false ) assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) # FIXME: should be "deposited" but current limitation in the # requests_mock_datadir_visits use, cannot find a way to make it work right now assert actual_deposit["deposit_status"] == "partial" @pytest.mark.parametrize( "output_format,callable_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_status_with_output_format( output_format, callable_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.status" deposit_id = 1033 deposit_status_xml_path = os.path.join( datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status" ) with open(deposit_status_xml_path, "r") as f: deposit_status_xml = f.read() expected_deposit_status = dict(parse_xml(deposit_status_xml)) result = cli_runner.invoke( cli, [ "status", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--deposit-id", deposit_id, "--format", output_format, ], ) assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = callable_fn(result_output) assert actual_deposit == expected_deposit_status def test_cli_update_metadata_with_swhid_on_completed_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata on a completed deposit (status done) is ok """ api_url_basename = "deposit.test.updateswhid" deposit_id = 123 deposit_status_xml_path = os.path.join( datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status" ) with open(deposit_status_xml_path, "r") as f: deposit_status_xml = f.read() expected_deposit_status = dict(parse_xml(deposit_status_xml)) assert expected_deposit_status["deposit_status"] == "done" assert expected_deposit_status["deposit_swh_id"] is not None result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", expected_deposit_status["deposit_swh_id"], "--format", "json", ], ) assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status def test_cli_update_metadata_with_swhid_on_other_status_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata with swhid on other deposit status is not possible """ api_url_basename = "deposit.test.updateswhid" deposit_id = 321 deposit_status_xml_path = os.path.join( datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status" ) with open(deposit_status_xml_path, "r") as f: deposit_status_xml = f.read() expected_deposit_status = dict(parse_xml(deposit_status_xml)) assert expected_deposit_status["deposit_status"] != "done" result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "--format", "json", ], ) assert result.exit_code == 0, result.output actual_result = json.loads(result.output) assert "error" in actual_result assert actual_result == { "error": "You can only update metadata on deposit with status 'done'", "detail": "The deposit 321 has status 'partial'", "deposit_status": "partial", "deposit_id": 321, }