diff --git a/swh/deposit/cli/admin.py b/swh/deposit/cli/admin.py
index 17446bf9..2ffcfb0b 100644
--- a/swh/deposit/cli/admin.py
+++ b/swh/deposit/cli/admin.py
@@ -1,275 +1,284 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
import click
from swh.deposit.cli import deposit
+if TYPE_CHECKING:
+ from swh.deposit.models import DepositCollection
+
@deposit.group("admin")
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Optional extra configuration file.",
)
@click.option(
"--platform",
default="development",
type=click.Choice(["development", "production"]),
help="development or production platform",
)
@click.pass_context
-def admin(ctx, config_file, platform):
+def admin(ctx, config_file: str, platform: str):
"""Server administration tasks (manipulate user or collections)"""
from swh.deposit.config import setup_django_for
# configuration happens here
setup_django_for(platform, config_file=config_file)
@admin.group("user")
@click.pass_context
def user(ctx):
"""Manipulate user."""
# configuration happens here
pass
-def _create_collection(name):
+def _create_collection(name: str) -> DepositCollection:
"""Create the collection with name if it does not exist.
Args:
- name (str): collection's name
+ name: collection name
Returns:
- collection (DepositCollection): the existing collection object
- (created or not)
+ collection: the existing collection object
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositCollection
try:
collection = DepositCollection.objects.get(name=name)
- click.echo("Collection %s exists, nothing to do." % name)
+ click.echo(f"Collection '{name}' exists, skipping.")
except DepositCollection.DoesNotExist:
- click.echo("Create new collection %s" % name)
+ click.echo(f"Create collection '{name}'.")
collection = DepositCollection.objects.create(name=name)
- click.echo("Collection %s created" % name)
+ click.echo(f"Collection '{name}' created.")
return collection
@user.command("create")
@click.option("--username", required=True, help="User's name")
@click.option("--password", required=True, help="Desired user's password (plain).")
@click.option("--firstname", default="", help="User's first name")
@click.option("--lastname", default="", help="User's last name")
@click.option("--email", default="", help="User's email")
@click.option("--collection", help="User's collection")
@click.option("--provider-url", default="", help="Provider URL")
@click.option("--domain", default="", help="The domain")
@click.pass_context
def user_create(
ctx,
- username,
- password,
- firstname,
- lastname,
- email,
- collection,
- provider_url,
- domain,
+ username: str,
+ password: str,
+ firstname: str,
+ lastname: str,
+ email: str,
+ collection: str,
+ provider_url: str,
+ domain: str,
):
"""Create a user with some needed information (password, collection)
If the collection does not exist, the collection is then created
alongside.
The password is stored encrypted using django's utilities.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositClient
# If collection is not provided, fallback to username
if not collection:
collection = username
- click.echo("collection: %s" % collection)
# create the collection if it does not exist
- collection = _create_collection(collection)
+ collection_ = _create_collection(collection)
# user create/update
try:
- user = DepositClient.objects.get(username=username)
- click.echo("User %s exists, updating information." % user)
+ user = DepositClient.objects.get(username=username) # type: ignore
+ click.echo(f"Update user '{username}'.")
user.set_password(password)
+ action_done = "updated"
except DepositClient.DoesNotExist:
- click.echo("Create new user %s" % username)
- user = DepositClient.objects.create_user(username=username, password=password)
+ click.echo(f"Create user '{username}'.")
+ user = DepositClient.objects.create_user( # type: ignore
+ username=username, password=password
+ )
+ action_done = "created"
- user.collections = [collection.id]
+ user.collections = [collection_.id]
user.first_name = firstname
user.last_name = lastname
user.email = email
user.is_active = True
user.provider_url = provider_url
user.domain = domain
user.save()
- click.echo("Information registered for user %s" % user)
+ click.echo(f"User '{username}' {action_done}.")
@user.command("list")
@click.pass_context
def user_list(ctx):
"""List existing users.
This entrypoint is not paginated yet as there is not a lot of
entry.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositClient
users = DepositClient.objects.all()
if not users:
output = "Empty user list"
else:
output = "\n".join((user.username for user in users))
click.echo(output)
@user.command("exists")
@click.argument("username", required=True)
@click.pass_context
-def user_exists(ctx, username):
+def user_exists(ctx, username: str):
"""Check if user exists.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositClient
try:
- DepositClient.objects.get(username=username)
- click.echo("User %s exists." % username)
+ DepositClient.objects.get(username=username) # type: ignore
+ click.echo(f"User {username} exists.")
ctx.exit(0)
except DepositClient.DoesNotExist:
- click.echo("User %s does not exist." % username)
+ click.echo(f"User {username} does not exist.")
ctx.exit(1)
@admin.group("collection")
@click.pass_context
def collection(ctx):
"""Manipulate collections."""
pass
@collection.command("create")
@click.option("--name", required=True, help="Collection's name")
@click.pass_context
def collection_create(ctx, name):
_create_collection(name)
@collection.command("list")
@click.pass_context
def collection_list(ctx):
"""List existing collections.
This entrypoint is not paginated yet as there is not a lot of
entry.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositCollection
collections = DepositCollection.objects.all()
if not collections:
output = "Empty collection list"
else:
output = "\n".join((col.name for col in collections))
click.echo(output)
@admin.group("deposit")
@click.pass_context
def adm_deposit(ctx):
"""Manipulate deposit."""
pass
@adm_deposit.command("reschedule")
@click.option("--deposit-id", required=True, help="Deposit identifier")
@click.pass_context
def adm_deposit_reschedule(ctx, deposit_id):
"""Reschedule the deposit loading
This will:
- check the deposit's status to something reasonable (failed or done). That
means that the checks have passed alright but something went wrong during
the loading (failed: loading failed, done: loading ok, still for some
reasons as in bugs, we need to reschedule it)
- reset the deposit's status to 'verified' (prior to any loading but after
the checks which are fine) and removes the different archives'
identifiers (swh-id, ...)
- trigger back the loading task through the scheduler
"""
# to avoid loading too early django namespaces
from datetime import datetime
from swh.deposit.config import (
DEPOSIT_STATUS_LOAD_FAILURE,
DEPOSIT_STATUS_LOAD_SUCCESS,
DEPOSIT_STATUS_VERIFIED,
APIConfig,
)
from swh.deposit.models import Deposit
try:
deposit = Deposit.objects.get(pk=deposit_id)
except Deposit.DoesNotExist:
click.echo("Deposit %s does not exist." % deposit_id)
ctx.exit(1)
# Check the deposit is in a reasonable state
accepted_statuses = [DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE]
if deposit.status == DEPOSIT_STATUS_VERIFIED:
click.echo("Deposit %s's status already set for rescheduling." % (deposit_id))
ctx.exit(0)
if deposit.status not in accepted_statuses:
click.echo(
"Deposit %s's status be one of %s."
% (deposit_id, ", ".join(accepted_statuses))
)
ctx.exit(1)
task_id = deposit.load_task_id
if not task_id:
click.echo(
"Deposit %s cannot be rescheduled. It misses the "
"associated task." % deposit_id
)
ctx.exit(1)
# Reset the deposit's state
deposit.swhid = None
deposit.swhid_context = None
deposit.status = DEPOSIT_STATUS_VERIFIED
deposit.save()
# Trigger back the deposit
scheduler = APIConfig().scheduler
scheduler.set_status_tasks(
[task_id], status="next_run_not_scheduled", next_run=datetime.now()
)
diff --git a/swh/deposit/tests/cli/conftest.py b/swh/deposit/tests/cli/conftest.py
new file mode 100644
index 00000000..aa23c215
--- /dev/null
+++ b/swh/deposit/tests/cli/conftest.py
@@ -0,0 +1,12 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from click.testing import CliRunner
+import pytest
+
+
+@pytest.fixture
+def cli_runner():
+ return CliRunner()
diff --git a/swh/deposit/tests/cli/test_admin.py b/swh/deposit/tests/cli/test_admin.py
new file mode 100644
index 00000000..73262aab
--- /dev/null
+++ b/swh/deposit/tests/cli/test_admin.py
@@ -0,0 +1,189 @@
+# Copyright (C) 2019-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.deposit.cli.admin import admin as cli
+from swh.deposit.models import DepositClient, DepositCollection
+
+
+@pytest.fixture(autouse=True)
+def enable_db_access_for_all_tests(db):
+ pass
+
+
+def test_cli_admin_user_list_nothing(cli_runner):
+ result = cli_runner.invoke(cli, ["user", "list",])
+
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ assert result.output == "Empty user list\n"
+
+
+def test_cli_admin_user_list_with_users(cli_runner, deposit_user):
+ result = cli_runner.invoke(cli, ["user", "list",])
+
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ assert result.output == f"{deposit_user.username}\n" # only 1 user
+
+
+def test_cli_admin_collection_list_nothing(cli_runner):
+ result = cli_runner.invoke(cli, ["collection", "list",])
+
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ assert result.output == "Empty collection list\n"
+
+
+def test_cli_admin_collection_list_with_collections(cli_runner, deposit_collection):
+ from swh.deposit.tests.conftest import create_deposit_collection
+
+ new_collection = create_deposit_collection("something")
+
+ result = cli_runner.invoke(cli, ["collection", "list",])
+
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ collections = "\n".join([deposit_collection.name, new_collection.name])
+ assert result.output == f"{collections}\n"
+
+
+def test_cli_admin_user_exists_unknown(cli_runner):
+ result = cli_runner.invoke(cli, ["user", "exists", "unknown"])
+
+ assert result.exit_code == 1, f"Unexpected output: {result.output}"
+ assert result.output == "User unknown does not exist.\n"
+
+
+def test_cli_admin_user_exists(cli_runner, deposit_user):
+ result = cli_runner.invoke(cli, ["user", "exists", deposit_user.username])
+
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ assert result.output == f"User {deposit_user.username} exists.\n"
+
+
+def test_cli_admin_create_collection(cli_runner):
+ collection_name = "something"
+
+ try:
+ DepositCollection.objects.get(name=collection_name)
+ except DepositCollection.DoesNotExist:
+ pass
+
+ result = cli_runner.invoke(
+ cli, ["collection", "create", "--name", collection_name,]
+ )
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+
+ collection = DepositCollection.objects.get(name=collection_name)
+ assert collection is not None
+
+ assert (
+ result.output
+ == f"""Create collection '{collection_name}'.
+Collection '{collection_name}' created.
+"""
+ )
+
+ result2 = cli_runner.invoke(
+ cli, ["collection", "create", "--name", collection_name,]
+ )
+ assert result2.exit_code == 0, f"Unexpected output: {result.output}"
+ assert (
+ result2.output
+ == f"""Collection '{collection_name}' exists, skipping.
+"""
+ )
+
+
+def test_cli_admin_user_create(cli_runner):
+ user_name = "user"
+ collection_name = user_name
+
+ try:
+ DepositClient.objects.get(username=user_name)
+ except DepositClient.DoesNotExist:
+ pass
+
+ try:
+ DepositCollection.objects.get(name=collection_name)
+ except DepositCollection.DoesNotExist:
+ pass
+
+ result = cli_runner.invoke(
+ cli, ["user", "create", "--username", user_name, "--password", "password",]
+ )
+ assert result.exit_code == 0, f"Unexpected output: {result.output}"
+ user = DepositClient.objects.get(username=user_name)
+ assert user is not None
+ collection = DepositCollection.objects.get(name=collection_name)
+ assert collection is not None
+
+ assert (
+ result.output
+ == f"""Create collection '{user_name}'.
+Collection '{collection_name}' created.
+Create user '{user_name}'.
+User '{user_name}' created.
+"""
+ )
+
+ assert collection.name == collection_name
+ assert user.username == user_name
+ first_password = user.password
+ assert first_password is not None
+ assert user.collections == [collection.id]
+ assert user.is_active is True
+ assert user.domain == ""
+ assert user.provider_url == ""
+ assert user.email == ""
+ assert user.first_name == ""
+ assert user.last_name == ""
+
+ # create a user that already exists
+ result2 = cli_runner.invoke(
+ cli,
+ [
+ "user",
+ "create",
+ "--username",
+ "user",
+ "--password",
+ "another-password", # changing password
+ "--collection",
+ collection_name, # specifying the collection this time
+ "--firstname",
+ "User",
+ "--lastname",
+ "no one",
+ "--email",
+ "user@org.org",
+ "--provider-url",
+ "http://some-provider.org",
+ "--domain",
+ "domain",
+ ],
+ )
+
+ assert result2.exit_code == 0, f"Unexpected output: {result2.output}"
+ user = DepositClient.objects.get(username=user_name)
+ assert user is not None
+
+ assert user.username == user_name
+ assert user.collections == [collection.id]
+ assert user.is_active is True
+ second_password = user.password
+ assert second_password is not None
+ assert second_password != first_password, "Password should have changed"
+ assert user.domain == "domain"
+ assert user.provider_url == "http://some-provider.org"
+ assert user.email == "user@org.org"
+ assert user.first_name == "User"
+ assert user.last_name == "no one"
+
+ assert (
+ result2.output
+ == f"""Collection '{collection_name}' exists, skipping.
+Update user '{user_name}'.
+User '{user_name}' updated.
+"""
+ )
diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py
index b5064e1d..2d879b3a 100644
--- a/swh/deposit/tests/cli/test_client.py
+++ b/swh/deposit/tests/cli/test_client.py
@@ -1,723 +1,717 @@
-# Copyright (C) 2019-2020 The Software Heritage developers
+# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import ast
import contextlib
import json
import logging
import os
from unittest.mock import MagicMock
-from click.testing import CliRunner
import pytest
import yaml
from swh.deposit.cli import deposit as cli
from swh.deposit.cli.client import InputError, _client, _collection, _url, generate_slug
from swh.deposit.client import MaintenanceError, PublicApiDepositClient
from swh.deposit.parsers import parse_xml
from ..conftest import TEST_USER
@pytest.fixture
def deposit_config():
return {
"url": "https://deposit.swh.test/1",
"auth": {"username": "test", "password": "test",},
}
@pytest.fixture
def datadir(request):
"""Override default datadir to target main test datadir"""
return os.path.join(os.path.dirname(str(request.fspath)), "../data")
@pytest.fixture
def slug():
return generate_slug()
@pytest.fixture
def patched_tmp_path(tmp_path, mocker):
mocker.patch(
"tempfile.TemporaryDirectory",
return_value=contextlib.nullcontext(str(tmp_path)),
)
return tmp_path
-@pytest.fixture
-def cli_runner():
- return CliRunner()
-
-
@pytest.fixture
def client_mock_api_down(mocker, slug):
"""A mock client whose connection with api fails due to maintenance issue
"""
mock_client = MagicMock()
mocker.patch("swh.deposit.cli.client._client", return_value=mock_client)
mock_client.service_document.side_effect = MaintenanceError(
"Database backend maintenance: Temporarily unavailable, try again later."
)
return mock_client
def test_cli_url():
assert _url("http://deposit") == "http://deposit/1"
assert _url("https://other/1") == "https://other/1"
def test_cli_client():
client = _client("http://deposit", "user", "pass")
assert isinstance(client, PublicApiDepositClient)
def test_cli_collection_error():
mock_client = MagicMock()
mock_client.service_document.return_value = {"error": "something went wrong"}
with pytest.raises(InputError) as e:
_collection(mock_client)
assert "Service document retrieval: something went wrong" == str(e.value)
def test_cli_collection_ok(deposit_config, requests_mock_datadir):
client = PublicApiDepositClient(deposit_config)
collection_name = _collection(client)
assert collection_name == "test"
def test_cli_collection_ko_because_downtime():
mock_client = MagicMock()
mock_client.service_document.side_effect = MaintenanceError("downtime")
with pytest.raises(MaintenanceError, match="downtime"):
_collection(mock_client)
def test_cli_deposit_with_server_down_for_maintenance(
sample_archive, caplog, client_mock_api_down, slug, patched_tmp_path, cli_runner
):
""" Deposit failure due to maintenance down time should be explicit
"""
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--archive",
sample_archive["path"],
"--author",
"Jane Doe",
],
)
assert result.exit_code == 1, result.output
assert result.output == ""
down_for_maintenance_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
"Database backend maintenance: Temporarily unavailable, try again later.",
)
assert down_for_maintenance_log_record in caplog.record_tuples
client_mock_api_down.service_document.assert_called_once_with()
def test_cli_single_minimal_deposit(
sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner
):
""" This ensure a single deposit upload through the cli is fine, cf.
https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit
""" # noqa
metadata_path = os.path.join(patched_tmp_path, "metadata.xml")
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--archive",
sample_archive["path"],
"--author",
"Jane Doe",
"--slug",
slug,
"--format",
"json",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {
"deposit_id": "615",
"deposit_status": "partial",
"deposit_status_detail": None,
"deposit_date": "Oct. 8, 2020, 4:57 p.m.",
}
with open(metadata_path) as fd:
assert (
fd.read()
== f"""\
\ttest-project
\t{slug}
\t
\t\tJane Doe
\t
"""
)
def test_cli_validation_metadata(
sample_archive, caplog, patched_tmp_path, cli_runner, slug
):
"""Multiple metadata flags scenario (missing, conflicts) properly fails the calls
"""
metadata_path = os.path.join(patched_tmp_path, "metadata.xml")
with open(metadata_path, "a"):
pass # creates the file
for flag_title_or_name, author_or_name in [
("--author", "no one"),
("--name", "test-project"),
]:
# Test missing author then missing name
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--archive",
sample_archive["path"],
"--slug",
slug,
flag_title_or_name,
author_or_name,
],
)
assert result.exit_code == 1, f"unexpected result: {result.output}"
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"For metadata deposit request, either a metadata file with "
"--metadata or both --author and --name must be provided. "
"If this is an archive deposit request, none is required."
),
)
assert expected_error_log_record in caplog.record_tuples
# Clear mocking state
caplog.clear()
# incompatible flags: Test both --metadata and --author, then --metadata and
# --name
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--deposit-id",
666,
"--archive",
sample_archive["path"],
"--slug",
slug,
],
)
assert result.exit_code == 1, f"unexpected result: {result.output}"
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"For metadata deposit request, either a metadata file with "
"--metadata or both --author and --name must be provided."
),
)
assert expected_error_log_record in caplog.record_tuples
# Clear mocking state
caplog.clear()
# incompatible flags check (Test both --metadata and --author,
# then --metadata and --name)
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--archive",
sample_archive["path"],
"--metadata",
metadata_path,
"--author",
"Jane Doe",
"--slug",
slug,
],
)
assert result.exit_code == 1, result.output
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"Using --metadata flag is incompatible with both "
"--author and --name (Those are used to generate one metadata file)."
),
)
assert expected_error_log_record in caplog.record_tuples
caplog.clear()
def test_cli_validation_no_actionable_command(caplog, cli_runner):
"""Multiple metadata flags scenario (missing, conflicts) properly fails the calls
"""
# no actionable command
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--partial",
],
)
assert result.exit_code == 1, result.output
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"Please provide an actionable command. See --help for more information"
),
)
assert expected_error_log_record in caplog.record_tuples
def test_cli_validation_missing_metadata_flag(caplog, cli_runner):
"""--metadata-deposit requires --metadata (or --name and --author) otherwise fails
"""
# --metadata-deposit without --metadata flag fails
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--metadata-deposit", # should fail because missing --metadata flag
],
)
assert result.exit_code == 1, result.output
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"Metadata deposit must be provided for metadata "
"deposit, either a filepath with --metadata or --name and --author"
),
)
assert expected_error_log_record in caplog.record_tuples
def test_cli_validation_replace_with_no_deposit_id_fails(
sample_archive, caplog, patched_tmp_path, requests_mock_datadir, datadir, cli_runner
):
"""--replace flags require --deposit-id otherwise fails
"""
metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml")
for flags in [
["--replace"],
["--replace", "--metadata-deposit", "--archive-deposit"],
]:
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--metadata",
metadata_path,
"--archive",
sample_archive["path"],
]
+ flags,
)
assert result.exit_code == 1, result.output
assert result.output == ""
expected_error_log_record = (
"swh.deposit.cli.client",
logging.ERROR,
(
"Problem during parsing options: "
"To update an existing deposit, you must provide its id"
),
)
assert expected_error_log_record in caplog.record_tuples
def test_cli_single_deposit_slug_generation(
sample_archive, patched_tmp_path, requests_mock_datadir, cli_runner
):
"""Single deposit scenario without providing the slug, the slug is generated nonetheless
https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit
""" # noqa
metadata_path = os.path.join(patched_tmp_path, "metadata.xml")
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
"https://deposit.swh.test/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--archive",
sample_archive["path"],
"--author",
"Jane Doe",
"--format",
"json",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {
"deposit_id": "615",
"deposit_status": "partial",
"deposit_status_detail": None,
"deposit_date": "Oct. 8, 2020, 4:57 p.m.",
}
with open(metadata_path) as fd:
metadata_xml = fd.read()
actual_metadata = parse_xml(metadata_xml)
assert actual_metadata["codemeta:identifier"] is not None
def test_cli_multisteps_deposit(
sample_archive, datadir, slug, requests_mock_datadir, cli_runner
):
""" First deposit a partial deposit (no metadata, only archive), then update the metadata part.
https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit
""" # noqa
api_url = "https://deposit.test.metadata/1"
deposit_id = 666
# Create a partial deposit with only 1 archive
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
api_url,
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--archive",
sample_archive["path"],
"--partial",
"--slug",
slug,
"--format",
"json",
],
)
assert result.exit_code == 0, f"unexpected output: {result.output}"
actual_deposit = json.loads(result.output)
assert actual_deposit == {
"deposit_id": str(deposit_id),
"deposit_status": "partial",
"deposit_status_detail": None,
"deposit_date": "Oct. 8, 2020, 4:57 p.m.",
}
# Update the partial deposit with only 1 archive
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
api_url,
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--archive",
sample_archive["path"],
"--deposit-id",
deposit_id,
"--partial", # in-progress: True, because remains the metadata to upload
"--slug",
slug,
"--format",
"json",
],
)
assert result.exit_code == 0, f"unexpected output: {result.output}"
assert result.output is not None
actual_deposit = json.loads(result.output)
# deposit update scenario actually returns a deposit status dict
assert actual_deposit["deposit_id"] == str(deposit_id)
assert actual_deposit["deposit_status"] == "partial"
# Update the partial deposit with only some metadata (and then finalize it)
# https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit
metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml")
# Update deposit with metadata
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
api_url,
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--metadata",
metadata_path,
"--deposit-id",
deposit_id,
"--slug",
slug,
"--format",
"json",
], # this time, ^ we no longer flag it to partial, so the status changes to
# in-progress false
)
assert result.exit_code == 0, f"unexpected output: {result.output}"
assert result.output is not None
actual_deposit = json.loads(result.output)
# deposit update scenario actually returns a deposit status dict
assert actual_deposit["deposit_id"] == str(deposit_id)
# FIXME: should be "deposited" but current limitation in the
# requests_mock_datadir_visits use, cannot find a way to make it work right now
assert actual_deposit["deposit_status"] == "partial"
@pytest.mark.parametrize(
"output_format,callable_fn",
[
("json", json.loads),
("yaml", yaml.safe_load),
(
"logging",
ast.literal_eval,
), # not enough though, the caplog fixture is needed
],
)
def test_cli_deposit_status_with_output_format(
output_format, callable_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner
):
"""Check deposit status cli with all possible output formats (json, yaml, logging).
"""
api_url_basename = "deposit.test.status"
deposit_id = 1033
deposit_status_xml_path = os.path.join(
datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status"
)
with open(deposit_status_xml_path, "r") as f:
deposit_status_xml = f.read()
expected_deposit_status = dict(parse_xml(deposit_status_xml))
result = cli_runner.invoke(
cli,
[
"status",
"--url",
f"https://{api_url_basename}/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--deposit-id",
deposit_id,
"--format",
output_format,
],
)
assert result.exit_code == 0, f"unexpected output: {result.output}"
if output_format == "logging":
assert len(caplog.record_tuples) == 1
# format: (, , )
_, _, result_output = caplog.record_tuples[0]
else:
result_output = result.output
actual_deposit = callable_fn(result_output)
assert actual_deposit == expected_deposit_status
def test_cli_update_metadata_with_swhid_on_completed_deposit(
datadir, requests_mock_datadir, cli_runner
):
"""Update new metadata on a completed deposit (status done) is ok
"""
api_url_basename = "deposit.test.updateswhid"
deposit_id = 123
deposit_status_xml_path = os.path.join(
datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status"
)
with open(deposit_status_xml_path, "r") as f:
deposit_status_xml = f.read()
expected_deposit_status = dict(parse_xml(deposit_status_xml))
assert expected_deposit_status["deposit_status"] == "done"
assert expected_deposit_status["deposit_swh_id"] is not None
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
f"https://{api_url_basename}/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--author",
"John Doe",
"--deposit-id",
deposit_id,
"--swhid",
expected_deposit_status["deposit_swh_id"],
"--format",
"json",
],
)
assert result.exit_code == 0, result.output
actual_deposit_status = json.loads(result.output)
assert "error" not in actual_deposit_status
assert actual_deposit_status == expected_deposit_status
def test_cli_update_metadata_with_swhid_on_other_status_deposit(
datadir, requests_mock_datadir, cli_runner
):
"""Update new metadata with swhid on other deposit status is not possible
"""
api_url_basename = "deposit.test.updateswhid"
deposit_id = 321
deposit_status_xml_path = os.path.join(
datadir, f"https_{api_url_basename}", f"1_test_{deposit_id}_status"
)
with open(deposit_status_xml_path, "r") as f:
deposit_status_xml = f.read()
expected_deposit_status = dict(parse_xml(deposit_status_xml))
assert expected_deposit_status["deposit_status"] != "done"
result = cli_runner.invoke(
cli,
[
"upload",
"--url",
f"https://{api_url_basename}/1",
"--username",
TEST_USER["username"],
"--password",
TEST_USER["password"],
"--name",
"test-project",
"--author",
"John Doe",
"--deposit-id",
deposit_id,
"--swhid",
"swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea",
"--format",
"json",
],
)
assert result.exit_code == 0, result.output
actual_result = json.loads(result.output)
assert "error" in actual_result
assert actual_result == {
"error": "You can only update metadata on deposit with status 'done'",
"detail": "The deposit 321 has status 'partial'",
"deposit_status": "partial",
"deposit_id": 321,
}