diff --git a/swh/deposit/cli/__init__.py b/swh/deposit/cli/__init__.py index cc6e00dc..e5c50d1f 100644 --- a/swh/deposit/cli/__init__.py +++ b/swh/deposit/cli/__init__.py @@ -1,40 +1,42 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# WARNING: do not import unnecessary things here to keep cli startup time under +# control import click import logging from swh.core.cli import CONTEXT_SETTINGS logger = logging.getLogger(__name__) @click.group(context_settings=CONTEXT_SETTINGS) @click.pass_context def deposit(ctx): """Deposit main command """ ctx.ensure_object(dict) log_level = ctx.obj.get("log_level", logging.INFO) logger.setLevel(log_level) def main(): logging.basicConfig() return deposit(auto_envvar_prefix="SWH_DEPOSIT") # These import statements MUST be executed after defining the 'deposit' group # since the subcommands in these are defined using this 'deposit' group. from . import client # noqa try: from . import admin # noqa except ImportError: # server part is optional logger.debug("admin subcommand not loaded") if __name__ == "__main__": main() diff --git a/swh/deposit/cli/admin.py b/swh/deposit/cli/admin.py index 6b387940..125b1821 100644 --- a/swh/deposit/cli/admin.py +++ b/swh/deposit/cli/admin.py @@ -1,271 +1,274 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# WARNING: do not import unnecessary things here to keep cli startup time under +# control import click -from swh.deposit.config import setup_django_for from swh.deposit.cli import deposit @deposit.group("admin") @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Optional extra configuration file.", ) @click.option( "--platform", default="development", type=click.Choice(["development", "production"]), help="development or production platform", ) @click.pass_context def admin(ctx, config_file, platform): """Server administration tasks (manipulate user or collections)""" + from swh.deposit.config import setup_django_for + # configuration happens here setup_django_for(platform, config_file=config_file) @admin.group("user") @click.pass_context def user(ctx): """Manipulate user.""" # configuration happens here pass def _create_collection(name): """Create the collection with name if it does not exist. Args: name (str): collection's name Returns: collection (DepositCollection): the existing collection object (created or not) """ # to avoid loading too early django namespaces from swh.deposit.models import DepositCollection try: collection = DepositCollection.objects.get(name=name) click.echo("Collection %s exists, nothing to do." % name) except DepositCollection.DoesNotExist: click.echo("Create new collection %s" % name) collection = DepositCollection.objects.create(name=name) click.echo("Collection %s created" % name) return collection @user.command("create") @click.option("--username", required=True, help="User's name") @click.option("--password", required=True, help="Desired user's password (plain).") @click.option("--firstname", default="", help="User's first name") @click.option("--lastname", default="", help="User's last name") @click.option("--email", default="", help="User's email") @click.option("--collection", help="User's collection") @click.option("--provider-url", default="", help="Provider URL") @click.option("--domain", default="", help="The domain") @click.pass_context def user_create( ctx, username, password, firstname, lastname, email, collection, provider_url, domain, ): """Create a user with some needed information (password, collection) If the collection does not exist, the collection is then created alongside. The password is stored encrypted using django's utilities. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient # If collection is not provided, fallback to username if not collection: collection = username click.echo("collection: %s" % collection) # create the collection if it does not exist collection = _create_collection(collection) # user create/update try: user = DepositClient.objects.get(username=username) click.echo("User %s exists, updating information." % user) user.set_password(password) except DepositClient.DoesNotExist: click.echo("Create new user %s" % username) user = DepositClient.objects.create_user(username=username, password=password) user.collections = [collection.id] user.first_name = firstname user.last_name = lastname user.email = email user.is_active = True user.provider_url = provider_url user.domain = domain user.save() click.echo("Information registered for user %s" % user) @user.command("list") @click.pass_context def user_list(ctx): """List existing users. This entrypoint is not paginated yet as there is not a lot of entry. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient users = DepositClient.objects.all() if not users: output = "Empty user list" else: output = "\n".join((user.username for user in users)) click.echo(output) @user.command("exists") @click.argument("username", required=True) @click.pass_context def user_exists(ctx, username): """Check if user exists. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositClient try: DepositClient.objects.get(username=username) click.echo("User %s exists." % username) ctx.exit(0) except DepositClient.DoesNotExist: click.echo("User %s does not exist." % username) ctx.exit(1) @admin.group("collection") @click.pass_context def collection(ctx): """Manipulate collections.""" pass @collection.command("create") @click.option("--name", required=True, help="Collection's name") @click.pass_context def collection_create(ctx, name): _create_collection(name) @collection.command("list") @click.pass_context def collection_list(ctx): """List existing collections. This entrypoint is not paginated yet as there is not a lot of entry. """ # to avoid loading too early django namespaces from swh.deposit.models import DepositCollection collections = DepositCollection.objects.all() if not collections: output = "Empty collection list" else: output = "\n".join((col.name for col in collections)) click.echo(output) @admin.group("deposit") @click.pass_context def adm_deposit(ctx): """Manipulate deposit.""" pass @adm_deposit.command("reschedule") @click.option("--deposit-id", required=True, help="Deposit identifier") @click.pass_context def adm_deposit_reschedule(ctx, deposit_id): """Reschedule the deposit loading This will: - check the deposit's status to something reasonable (failed or done). That means that the checks have passed alright but something went wrong during the loading (failed: loading failed, done: loading ok, still for some reasons as in bugs, we need to reschedule it) - reset the deposit's status to 'verified' (prior to any loading but after the checks which are fine) and removes the different archives' identifiers (swh-id, ...) - trigger back the loading task through the scheduler """ # to avoid loading too early django namespaces from datetime import datetime from swh.deposit.models import Deposit from swh.deposit.config import ( DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_VERIFIED, SWHDefaultConfig, ) try: deposit = Deposit.objects.get(pk=deposit_id) except Deposit.DoesNotExist: click.echo("Deposit %s does not exist." % deposit_id) ctx.exit(1) # Check the deposit is in a reasonable state accepted_statuses = [DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE] if deposit.status == DEPOSIT_STATUS_VERIFIED: click.echo("Deposit %s's status already set for rescheduling." % (deposit_id)) ctx.exit(0) if deposit.status not in accepted_statuses: click.echo( "Deposit %s's status be one of %s." % (deposit_id, ", ".join(accepted_statuses)) ) ctx.exit(1) task_id = deposit.load_task_id if not task_id: click.echo( "Deposit %s cannot be rescheduled. It misses the " "associated task." % deposit_id ) ctx.exit(1) # Reset the deposit's state deposit.swh_id = None deposit.swh_id_context = None deposit.status = DEPOSIT_STATUS_VERIFIED deposit.save() # Trigger back the deposit scheduler = SWHDefaultConfig().scheduler scheduler.set_status_tasks( [task_id], status="next_run_not_scheduled", next_run=datetime.now() ) diff --git a/swh/deposit/cli/client.py b/swh/deposit/cli/client.py index cf618307..471bd7d7 100644 --- a/swh/deposit/cli/client.py +++ b/swh/deposit/cli/client.py @@ -1,497 +1,507 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# WARNING: do not import unnecessary things here to keep cli startup time under +# control import os import logging import sys -import tempfile -import uuid -import json -import yaml import click -import xmltodict -from swh.deposit.client import PublicApiDepositClient, MaintenanceError from swh.deposit.cli import deposit logger = logging.getLogger(__name__) class InputError(ValueError): """Input script error """ pass def generate_slug(): """Generate a slug (sample purposes). """ + import uuid + return str(uuid.uuid4()) def _url(url): """Force the /1 api version at the end of the url (avoiding confusing issues without it). Args: url (str): api url used by cli users Returns: Top level api url to actually request """ if not url.endswith("/1"): url = "%s/1" % url return url def generate_metadata_file(name, external_id, authors, temp_dir): """Generate a temporary metadata file with the minimum required metadata This generates a xml file in a temporary location and returns the path to that file. This is up to the client of that function to clean up the temporary file. Args: name (str): Software's name external_id (str): External identifier (slug) or generated one authors (List[str]): List of author names Returns: Filepath to the metadata generated file """ + import xmltodict + path = os.path.join(temp_dir, "metadata.xml") # generate a metadata file with the minimum required metadata codemetadata = { "entry": { "@xmlns": "http://www.w3.org/2005/Atom", "@xmlns:codemeta": "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0", "codemeta:name": name, "codemeta:identifier": external_id, "codemeta:author": [ {"codemeta:name": author_name} for author_name in authors ], }, } logging.debug("Temporary file: %s", path) logging.debug("Metadata dict to generate as xml: %s", codemetadata) s = xmltodict.unparse(codemetadata, pretty=True) logging.debug("Metadata dict as xml generated: %s", s) with open(path, "w") as fp: fp.write(s) return path def _client(url, username, password): """Instantiate a client to access the deposit api server Args: url (str): Deposit api server username (str): User password (str): User's password """ + from swh.deposit.client import PublicApiDepositClient + client = PublicApiDepositClient( {"url": url, "auth": {"username": username, "password": password},} ) return client def _collection(client): """Retrieve the client's collection """ # retrieve user's collection sd_content = client.service_document() if "error" in sd_content: raise InputError("Service document retrieval: %s" % (sd_content["error"],)) collection = sd_content["service"]["workspace"]["collection"]["sword:name"] return collection def client_command_parse_input( username, password, archive, metadata, archive_deposit, metadata_deposit, collection, slug, partial, deposit_id, replace, url, name, authors, temp_dir, ): """Parse the client subcommand options and make sure the combination is acceptable*. If not, an InputError exception is raised explaining the issue. By acceptable, we mean: - A multipart deposit (create or update) requires: - an existing software archive - an existing metadata file or author(s) and name provided in params - A binary deposit (create/update) requires an existing software archive - A metadata deposit (create/update) requires an existing metadata file or author(s) and name provided in params - A deposit update requires a deposit_id This will not prevent all failure cases though. The remaining errors are already dealt with by the underlying api client. Raises: InputError explaining the user input related issue MaintenanceError explaining the api status Returns: dict with the following keys: 'archive': the software archive to deposit 'username': username 'password': associated password 'metadata': the metadata file to deposit 'collection': the username's associated client 'slug': the slug or external id identifying the deposit to make 'partial': if the deposit is partial or not 'client': instantiated class 'url': deposit's server main entry point 'deposit_type': deposit's type (binary, multipart, metadata) 'deposit_id': optional deposit identifier """ if archive_deposit and metadata_deposit: # too many flags use, remove redundant ones (-> multipart deposit) archive_deposit = False metadata_deposit = False if not slug: # generate one as this is mandatory slug = generate_slug() if not metadata: if name and authors: metadata = generate_metadata_file(name, slug, authors, temp_dir) elif not archive_deposit and not partial and not deposit_id: # If we meet all the following conditions: # * there is not an archive-only deposit # * it is not part of a multipart deposit (either create/update # or finish) # * it misses either name or authors raise InputError( "Either a metadata file (--metadata) or both --author and " "--name must be provided, unless this is an archive-only " "deposit." ) elif name or authors: # If we are generating metadata, then all mandatory metadata # must be present raise InputError( "Either a metadata file (--metadata) or both --author and " "--name must be provided." ) else: # TODO: this is a multipart deposit, we might want to check that # metadata are deposited at some point pass elif name or authors: raise InputError( "Using a metadata file (--metadata) is incompatible with " "--author and --name, which are used to generate one." ) if metadata_deposit: archive = None if archive_deposit: metadata = None if metadata_deposit and not metadata: raise InputError( "Metadata deposit must be provided for metadata " "deposit (either a filepath or --name and --author)" ) if not archive and not metadata and partial: raise InputError( "Please provide an actionable command. See --help for more " "information" ) if replace and not deposit_id: raise InputError("To update an existing deposit, you must provide its id") client = _client(url, username, password) if not collection: collection = _collection(client) return { "archive": archive, "username": username, "password": password, "metadata": metadata, "collection": collection, "slug": slug, "in_progress": partial, "client": client, "url": url, "deposit_id": deposit_id, "replace": replace, } def _subdict(d, keys): "return a dict from d with only given keys" return {k: v for k, v in d.items() if k in keys} def deposit_create(config, logger): """Delegate the actual deposit to the deposit client. """ logger.debug("Create deposit") client = config["client"] keys = ("collection", "archive", "metadata", "slug", "in_progress") return client.deposit_create(**_subdict(config, keys)) def deposit_update(config, logger): """Delegate the actual deposit to the deposit client. """ logger.debug("Update deposit") client = config["client"] keys = ( "collection", "deposit_id", "archive", "metadata", "slug", "in_progress", "replace", ) return client.deposit_update(**_subdict(config, keys)) @deposit.command() @click.option("--username", required=True, help="(Mandatory) User's name") @click.option( "--password", required=True, help="(Mandatory) User's associated password" ) @click.option( "--archive", type=click.Path(exists=True), help="(Optional) Software archive to deposit", ) @click.option( "--metadata", type=click.Path(exists=True), help=( "(Optional) Path to xml metadata file. If not provided, " "this will use a file named .metadata.xml" ), ) # noqa @click.option( "--archive-deposit/--no-archive-deposit", default=False, help="(Optional) Software archive only deposit", ) @click.option( "--metadata-deposit/--no-metadata-deposit", default=False, help="(Optional) Metadata only deposit", ) @click.option( "--collection", help="(Optional) User's collection. If not provided, this will be fetched.", ) # noqa @click.option( "--slug", help=( "(Optional) External system information identifier. " "If not provided, it will be generated" ), ) # noqa @click.option( "--partial/--no-partial", default=False, help=( "(Optional) The deposit will be partial, other deposits " "will have to take place to finalize it." ), ) # noqa @click.option( "--deposit-id", default=None, help="(Optional) Update an existing partial deposit with its identifier", ) # noqa @click.option( "--replace/--no-replace", default=False, help="(Optional) Update by replacing existing metadata to a deposit", ) # noqa @click.option( "--url", default="https://deposit.softwareheritage.org", help=( "(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1" ), ) # noqa @click.option("--verbose/--no-verbose", default=False, help="Verbose mode") @click.option("--name", help="Software name") @click.option( "--author", multiple=True, help="Software author(s), this can be repeated as many times" " as there are authors", ) @click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", ) @click.pass_context def upload( ctx, username, password, archive=None, metadata=None, archive_deposit=False, metadata_deposit=False, collection=None, slug=None, partial=False, deposit_id=None, replace=False, url="https://deposit.softwareheritage.org", verbose=False, name=None, author=None, output_format=None, ): """Software Heritage Public Deposit Client Create/Update deposit through the command line. More documentation can be found at https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html. """ + import tempfile + from swh.deposit.client import MaintenanceError + url = _url(url) config = {} with tempfile.TemporaryDirectory() as temp_dir: try: logger.debug("Parsing cli options") config = client_command_parse_input( username, password, archive, metadata, archive_deposit, metadata_deposit, collection, slug, partial, deposit_id, replace, url, name, author, temp_dir, ) except InputError as e: logger.error("Problem during parsing options: %s", e) sys.exit(1) except MaintenanceError as e: logger.error(e) sys.exit(1) if verbose: logger.info("Parsed configuration: %s" % (config,)) deposit_id = config["deposit_id"] if deposit_id: r = deposit_update(config, logger) else: r = deposit_create(config, logger) print_result(r, output_format) @deposit.command() @click.option( "--url", default="https://deposit.softwareheritage.org", help="(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1", ) @click.option("--username", required=True, help="(Mandatory) User's name") @click.option( "--password", required=True, help="(Mandatory) User's associated password" ) @click.option("--deposit-id", default=None, required=True, help="Deposit identifier.") @click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", ) @click.pass_context def status(ctx, url, username, password, deposit_id, output_format): """Deposit's status """ + from swh.deposit.client import MaintenanceError + url = _url(url) logger.debug("Status deposit") try: client = _client(url, username, password) collection = _collection(client) except InputError as e: logger.error("Problem during parsing options: %s", e) sys.exit(1) except MaintenanceError as e: logger.error(e) sys.exit(1) print_result( client.deposit_status(collection=collection, deposit_id=deposit_id), output_format, ) def print_result(data, output_format): + import json + import yaml + if output_format == "json": click.echo(json.dumps(data)) elif output_format == "yaml": click.echo(yaml.dump(data)) else: logger.info(data) diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py index 7a72cae9..d3b0b9ea 100644 --- a/swh/deposit/tests/cli/test_client.py +++ b/swh/deposit/tests/cli/test_client.py @@ -1,463 +1,463 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import logging import os import re from unittest.mock import MagicMock from click.testing import CliRunner import pytest from swh.deposit.client import PublicApiDepositClient, MaintenanceError from swh.deposit.cli.client import generate_slug, _url, _client, _collection, InputError from swh.deposit.cli import deposit as cli from ..conftest import TEST_USER EXAMPLE_SERVICE_DOCUMENT = { "service": {"workspace": {"collection": {"sword:name": "softcol",}}} } @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def slug(): return generate_slug() @pytest.fixture def client_mock(mocker, slug): """A successful deposit client with hard-coded default values """ mocker.patch("swh.deposit.cli.client.generate_slug", return_value=slug) mock_client = MagicMock() mocker.patch("swh.deposit.cli.client._client", return_value=mock_client) mock_client.service_document.return_value = EXAMPLE_SERVICE_DOCUMENT mock_client.deposit_create.return_value = '{"foo": "bar"}' return mock_client @pytest.fixture def client_mock_api_down(mocker, slug): """A mock client whose connection with api fails due to maintenance issue """ mocker.patch("swh.deposit.cli.client.generate_slug", return_value=slug) mock_client = MagicMock() mocker.patch("swh.deposit.cli.client._client", return_value=mock_client) mock_client.service_document.side_effect = MaintenanceError( "Database backend maintenance: Temporarily unavailable, try again later." ) return mock_client def test_url(): assert _url("http://deposit") == "http://deposit/1" assert _url("https://other/1") == "https://other/1" def test_client(): client = _client("http://deposit", "user", "pass") assert isinstance(client, PublicApiDepositClient) def test_collection_error(): mock_client = MagicMock() mock_client.service_document.return_value = {"error": "something went wrong"} with pytest.raises(InputError) as e: _collection(mock_client) assert "Service document retrieval: something went wrong" == str(e.value) def test_collection_ok(): mock_client = MagicMock() mock_client.service_document.return_value = EXAMPLE_SERVICE_DOCUMENT collection_name = _collection(mock_client) assert collection_name == "softcol" def test_collection_ko_because_downtime(): mock_client = MagicMock() mock_client.service_document.side_effect = MaintenanceError("downtime") with pytest.raises(MaintenanceError, match="downtime"): _collection(mock_client) def test_deposit_with_server_down_for_maintenance( sample_archive, mocker, caplog, client_mock_api_down, slug, tmp_path ): """ Deposit failure due to maintenance down time should be explicit """ runner = CliRunner() result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) assert result.exit_code == 1, result.output assert result.output == "" assert caplog.record_tuples == [ ( "swh.deposit.cli.client", logging.ERROR, "Database backend maintenance: Temporarily unavailable, try again later.", ) ] client_mock_api_down.service_document.assert_called_once_with() def test_single_minimal_deposit( sample_archive, mocker, caplog, client_mock, slug, tmp_path ): """ from: https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(tmp_path, "metadata.xml") mocker.patch( - "swh.deposit.cli.client.tempfile.TemporaryDirectory", + "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) runner = CliRunner() result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) assert result.exit_code == 0, result.output assert result.output == "" assert caplog.record_tuples == [ ("swh.deposit.cli.client", logging.INFO, '{"foo": "bar"}'), ] client_mock.deposit_create.assert_called_once_with( archive=sample_archive["path"], collection="softcol", in_progress=False, metadata=metadata_path, slug=slug, ) with open(metadata_path) as fd: assert ( fd.read() == f"""\ \ttest-project \t{slug} \t \t\tJane Doe \t """ ) def test_metadata_validation(sample_archive, mocker, caplog, tmp_path): """ from: https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa slug = generate_slug() mocker.patch("swh.deposit.cli.client.generate_slug", return_value=slug) mock_client = MagicMock() mocker.patch("swh.deposit.cli.client._client", return_value=mock_client) mock_client.service_document.return_value = EXAMPLE_SERVICE_DOCUMENT mock_client.deposit_create.return_value = '{"foo": "bar"}' metadata_path = os.path.join(tmp_path, "metadata.xml") mocker.patch( - "swh.deposit.cli.client.tempfile.TemporaryDirectory", + "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) with open(metadata_path, "a"): pass # creates the file runner = CliRunner() # Test missing author result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], ], ) assert result.exit_code == 1, result.output assert result.output == "" assert len(caplog.record_tuples) == 1 (_logger, level, message) = caplog.record_tuples[0] assert level == logging.ERROR assert " --author " in message # Clear mocking state caplog.clear() mock_client.reset_mock() # Test missing name result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) assert result.exit_code == 1, result.output assert result.output == "" assert len(caplog.record_tuples) == 1 (_logger, level, message) = caplog.record_tuples[0] assert level == logging.ERROR assert " --name " in message # Clear mocking state caplog.clear() mock_client.reset_mock() # Test both --metadata and --author result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--metadata", metadata_path, "--author", "Jane Doe", ], ) assert result.exit_code == 1, result.output assert result.output == "" assert len(caplog.record_tuples) == 1 (_logger, level, message) = caplog.record_tuples[0] assert level == logging.ERROR assert re.search("--metadata.*is incompatible with", message) # Clear mocking state caplog.clear() mock_client.reset_mock() def test_single_deposit_slug_generation( sample_archive, mocker, caplog, tmp_path, client_mock ): """ from: https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa slug = "my-slug" collection = "my-collection" metadata_path = os.path.join(tmp_path, "metadata.xml") mocker.patch( - "swh.deposit.cli.client.tempfile.TemporaryDirectory", + "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) runner = CliRunner() result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--slug", slug, "--collection", collection, "--author", "Jane Doe", ], ) assert result.exit_code == 0, result.output assert result.output == "" assert caplog.record_tuples == [ ("swh.deposit.cli.client", logging.INFO, '{"foo": "bar"}'), ] client_mock.deposit_create.assert_called_once_with( archive=sample_archive["path"], collection=collection, in_progress=False, metadata=metadata_path, slug=slug, ) with open(metadata_path) as fd: assert ( fd.read() == """\ \ttest-project \tmy-slug \t \t\tJane Doe \t """ ) def test_multisteps_deposit( sample_archive, atom_dataset, mocker, caplog, datadir, client_mock, slug ): """ from: https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit """ # noqa slug = generate_slug() mocker.patch("swh.deposit.cli.client.generate_slug", return_value=slug) # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#create-an-incomplete-deposit client_mock.deposit_create.return_value = '{"deposit_id": "42"}' runner = CliRunner() result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--partial", ], ) assert result.exit_code == 0, result.output assert result.output == "" assert caplog.record_tuples == [ ("swh.deposit.cli.client", logging.INFO, '{"deposit_id": "42"}'), ] client_mock.deposit_create.assert_called_once_with( archive=sample_archive["path"], collection="softcol", in_progress=True, metadata=None, slug=slug, ) # Clear mocking state caplog.clear() client_mock.reset_mock() # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") result = runner.invoke( cli, [ "upload", "--url", "mock://deposit.swh/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, ], ) assert result.exit_code == 0, result.output assert result.output == "" assert caplog.record_tuples == [ ("swh.deposit.cli.client", logging.INFO, '{"deposit_id": "42"}'), ] client_mock.deposit_create.assert_called_once_with( archive=None, collection="softcol", in_progress=False, metadata=metadata_path, slug=slug, ) # Clear mocking state caplog.clear() client_mock.reset_mock()