diff --git a/swh/deposit/api/checks.py b/swh/deposit/api/checks.py index ad7ba48d..bf310e35 100644 --- a/swh/deposit/api/checks.py +++ b/swh/deposit/api/checks.py @@ -1,74 +1,92 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Functional Metadata checks: Mandatory fields: - 'author' - 'name' or 'title' +Suggested fields: +- metadata-provenance + """ from typing import Dict, Optional, Tuple import iso8601 -from swh.deposit.utils import normalize_date +from swh.deposit.utils import normalize_date, parse_swh_metadata_provenance MANDATORY_FIELDS_MISSING = "Mandatory fields are missing" INVALID_DATE_FORMAT = "Invalid date format" +SUGGESTED_FIELDS_MISSING = "Suggested fields are missing" +METADATA_PROVENANCE_KEY = "swh:metadata-provenance" + def check_metadata(metadata: Dict) -> Tuple[bool, Optional[Dict]]: """Check metadata for mandatory field presence and date format. Args: metadata: Metadata dictionary to check Returns: - tuple (status, error_detail): True, None if metadata are - ok (False, ) otherwise. + tuple (status, error_detail): + - (True, None) if metadata are ok and suggested fields are also present + - (True, ) if metadata are ok but some suggestions are missing + - (False, ) otherwise. """ + suggested_fields = [] # at least one value per couple below is mandatory alternate_fields = { ("atom:name", "atom:title", "codemeta:name"): False, ("atom:author", "codemeta:author"): False, } for field, value in metadata.items(): for possible_names in alternate_fields: if field in possible_names: alternate_fields[possible_names] = True continue mandatory_result = [" or ".join(k) for k, v in alternate_fields.items() if not v] + # provenance metadata is optional + provenance_meta = parse_swh_metadata_provenance(metadata) + if provenance_meta is None: + suggested_fields = [ + {"summary": SUGGESTED_FIELDS_MISSING, "fields": [METADATA_PROVENANCE_KEY]} + ] + if mandatory_result: detail = [{"summary": MANDATORY_FIELDS_MISSING, "fields": mandatory_result}] - return False, {"metadata": detail} + return False, {"metadata": detail + suggested_fields} fields = [] commit_date = metadata.get("codemeta:datePublished") author_date = metadata.get("codemeta:dateCreated") if commit_date: try: normalize_date(commit_date) except iso8601.iso8601.ParseError: fields.append("codemeta:datePublished") if author_date: try: normalize_date(author_date) except iso8601.iso8601.ParseError: fields.append("codemeta:dateCreated") if fields: detail = [{"summary": INVALID_DATE_FORMAT, "fields": fields}] - return False, {"metadata": detail} + return False, {"metadata": detail + suggested_fields} + if suggested_fields: # it's fine but warn about missing suggested fields + return True, {"metadata": suggested_fields} return True, None diff --git a/swh/deposit/tests/api/test_checks.py b/swh/deposit/tests/api/test_checks.py index 226e83ee..f770cf07 100644 --- a/swh/deposit/tests/api/test_checks.py +++ b/swh/deposit/tests/api/test_checks.py @@ -1,125 +1,189 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Any, Dict + import pytest -from swh.deposit.api.checks import check_metadata +from swh.deposit.api.checks import ( + METADATA_PROVENANCE_KEY, + SUGGESTED_FIELDS_MISSING, + check_metadata, +) + +METADATA_PROVENANCE_DICT: Dict[str, Any] = { + "swh:deposit": { + METADATA_PROVENANCE_KEY: {"schema:url": "some-metadata-provenance-url"} + } +} @pytest.mark.parametrize( "metadata_ok", [ { "atom:url": "something", "atom:external_identifier": "something-else", "atom:name": "foo", "atom:author": "someone", + **METADATA_PROVENANCE_DICT, }, { "atom:url": "some url", "atom:external_identifier": "some id", "atom:title": "bar", "atom:author": "no one", + **METADATA_PROVENANCE_DICT, + }, + { + "atom:url": "some url", + "codemeta:name": "bar", + "codemeta:author": "no one", + **METADATA_PROVENANCE_DICT, }, - {"atom:url": "some url", "codemeta:name": "bar", "codemeta:author": "no one",}, { "atom:url": "some url", "atom:external_identifier": "some id", "atom:title": "bar", "atom:author": "no one", "codemeta:datePublished": "2020-12-21", "codemeta:dateCreated": "2020-12-21", }, ], ) def test_api_checks_check_metadata_ok(metadata_ok, swh_checks_deposit): actual_check, detail = check_metadata(metadata_ok) - assert actual_check is True, detail - assert detail is None + assert actual_check is True, f"Unexpected result: {detail}" + if "swh:deposit" in metadata_ok: + # no missing suggested field + assert detail is None + else: + # missing suggested field + assert detail == { + "metadata": [ + { + "fields": [METADATA_PROVENANCE_KEY], + "summary": SUGGESTED_FIELDS_MISSING, + } + ] + } @pytest.mark.parametrize( "metadata_ko,expected_summary", [ ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:author": "someone", + **METADATA_PROVENANCE_DICT, }, { "summary": "Mandatory fields are missing", "fields": ["atom:name or atom:title or codemeta:name"], }, ), ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:title": "foobar", + **METADATA_PROVENANCE_DICT, }, { "summary": "Mandatory fields are missing", "fields": ["atom:author or codemeta:author"], }, ), ( { "atom:url": "something", "atom:external_identifier": "something-else", "codemeta:title": "bar", "atom:author": "someone", + **METADATA_PROVENANCE_DICT, }, { "summary": "Mandatory fields are missing", "fields": ["atom:name or atom:title or codemeta:name"], }, ), ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:title": "foobar", "author": "foo", + **METADATA_PROVENANCE_DICT, }, { "summary": "Mandatory fields are missing", "fields": ["atom:author or codemeta:author"], }, ), ( { "atom:url": "something", "atom:external_identifier": "something-else", "atom:title": "foobar", "atom:authorblahblah": "foo", + **METADATA_PROVENANCE_DICT, }, { "summary": "Mandatory fields are missing", "fields": ["atom:author or codemeta:author"], }, ), + ( + { + "atom:url": "something", + "atom:external_identifier": "something-else", + "atom:author": "someone", + **METADATA_PROVENANCE_DICT, + }, + { + "summary": "Mandatory fields are missing", + "fields": ["atom:name or atom:title or codemeta:name"], + }, + ), + ], +) +def test_api_checks_check_metadata_ko( + metadata_ko, expected_summary, swh_checks_deposit +): + actual_check, error_detail = check_metadata(metadata_ko) + assert actual_check is False + assert error_detail == {"metadata": [expected_summary]} + + +@pytest.mark.parametrize( + "metadata_ko,expected_invalid_summary", + [ ( { "atom:url": "some url", "atom:external_identifier": "some id", "atom:title": "bar", "atom:author": "no one", "codemeta:datePublished": "2020-aa-21", "codemeta:dateCreated": "2020-12-bb", }, { "summary": "Invalid date format", "fields": ["codemeta:datePublished", "codemeta:dateCreated"], }, ), ], ) -def test_api_checks_check_metadata_ko( - metadata_ko, expected_summary, swh_checks_deposit +def test_api_checks_check_metadata_fields_ko_and_missing_suggested_fields( + metadata_ko, expected_invalid_summary, swh_checks_deposit ): actual_check, error_detail = check_metadata(metadata_ko) assert actual_check is False - assert error_detail == {"metadata": [expected_summary]} + assert error_detail == { + "metadata": [expected_invalid_summary] + + [{"fields": [METADATA_PROVENANCE_KEY], "summary": SUGGESTED_FIELDS_MISSING,}] + } diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py index 5fe2d3b5..d4fd5bd7 100644 --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -1,211 +1,218 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse_lazy as reverse import pytest from rest_framework import status -from swh.deposit.api.checks import MANDATORY_FIELDS_MISSING +from swh.deposit.api.checks import ( + MANDATORY_FIELDS_MISSING, + METADATA_PROVENANCE_KEY, + SUGGESTED_FIELDS_MISSING, +) from swh.deposit.api.private.deposit_check import ( MANDATORY_ARCHIVE_INVALID, MANDATORY_ARCHIVE_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ) from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, create_archive_with_archive, ) PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + "-nc" def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]), ] @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_ok( authenticated_client, deposit_collection, ready_deposit_ok, extension ): """Proper deposit should succeed the checks (-> status ready) """ deposit = ready_deposit_ok for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() @pytest.mark.parametrize("extension", ["zip", "tar", "tar.gz", "tar.bz2", "tar.xz"]) def test_deposit_invalid_tarball( tmp_path, authenticated_client, deposit_collection, extension ): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ deposit = create_deposit_archive_with_archive( tmp_path, extension, authenticated_client, deposit_collection.name ) for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_INVALID deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED def test_deposit_ko_missing_tarball( authenticated_client, deposit_collection, ready_deposit_only_metadata ): """Deposit without archive should fail the checks: rejected """ deposit = ready_deposit_only_metadata assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_ko_unsupported_tarball( tmp_path, authenticated_client, deposit_collection, ready_deposit_invalid_archive ): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit = ready_deposit_invalid_archive assert DEPOSIT_STATUS_DEPOSITED == deposit.status for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_REJECTED details = data["details"] # archive checks failure assert len(details["archive"]) == 1 assert details["archive"][0]["summary"] == MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure - assert len(details["metadata"]) == 1 + assert len(details["metadata"]) == 2 mandatory = details["metadata"][0] assert mandatory["summary"] == MANDATORY_FIELDS_MISSING assert set(mandatory["fields"]) == set( [ "atom:author or codemeta:author", "atom:name or atom:title or codemeta:name", ] ) + suggested = details["metadata"][1] + assert suggested["summary"] == SUGGESTED_FIELDS_MISSING + assert set(suggested["fields"]) == set([METADATA_PROVENANCE_KEY]) deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_deposit_metadata_ok( authenticated_client, deposit_collection, ready_deposit_ok ): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit = ready_deposit_ok assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def create_deposit_archive_with_archive( root_path, archive_extension, client, collection_name ): # we create the holding archive to a given extension archive = create_arborescence_archive( root_path, "archive1", "file1", b"some content in file", extension=archive_extension, ) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive(root_path, "invalid.tgz", archive) # we deposit it response = client.post( reverse(COL_IRI, args=[collection_name]), content_type="application/x-tar", data=invalid_archive["data"], CONTENT_LENGTH=invalid_archive["length"], HTTP_MD5SUM=invalid_archive["md5sum"], HTTP_SLUG="external-id", HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (invalid_archive["name"],), ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_status = response_content["swh:deposit_status"] assert deposit_status == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content["swh:deposit_id"]) deposit = Deposit.objects.get(pk=deposit_id) assert DEPOSIT_STATUS_DEPOSITED == deposit.status return deposit diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py index 8c8ec3bd..f9dfa6d5 100644 --- a/swh/deposit/tests/cli/test_client.py +++ b/swh/deposit/tests/cli/test_client.py @@ -1,1033 +1,1047 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast from collections import OrderedDict import contextlib import json import logging import os from unittest.mock import MagicMock import pytest import yaml -from swh.deposit.api.checks import check_metadata +from swh.deposit.api.checks import ( + METADATA_PROVENANCE_KEY, + SUGGESTED_FIELDS_MISSING, + check_metadata, +) from swh.deposit.cli import deposit as cli from swh.deposit.cli.client import InputError, _collection, _url, generate_metadata from swh.deposit.client import ( BaseDepositClient, MaintenanceError, PublicApiDepositClient, ServiceDocumentDepositClient, ) from swh.deposit.parsers import parse_xml from swh.model.exceptions import ValidationError from ..conftest import TEST_USER def generate_slug() -> str: """Generate a slug (sample purposes). """ import uuid return str(uuid.uuid4()) @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def slug(): return generate_slug() @pytest.fixture def patched_tmp_path(tmp_path, mocker): mocker.patch( "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) return tmp_path @pytest.fixture def client_mock_api_down(mocker, slug): """A mock client whose connection with api fails due to maintenance issue """ mock_client = MagicMock() mocker.patch("swh.deposit.client.PublicApiDepositClient", return_value=mock_client) mock_client.service_document.side_effect = MaintenanceError( "Database backend maintenance: Temporarily unavailable, try again later." ) return mock_client def test_cli_url(): assert _url("http://deposit") == "http://deposit/1" assert _url("https://other/1") == "https://other/1" def test_cli_collection_error(): mock_client = MagicMock() mock_client.service_document.return_value = {"error": "something went wrong"} with pytest.raises(InputError) as e: _collection(mock_client) assert "Service document retrieval: something went wrong" == str(e.value) def test_cli_collection_ok(requests_mock_datadir): client = PublicApiDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) collection_name = _collection(client) assert collection_name == "test" def test_cli_collection_ko_because_downtime(): mock_client = MagicMock() mock_client.service_document.side_effect = MaintenanceError("downtime") with pytest.raises(MaintenanceError, match="downtime"): _collection(mock_client) def test_cli_upload_conflictual_flags( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" metadata = atom_dataset["entry-data-minimal"] metadata_path = os.path.join(tmp_path, "entry-data-minimal.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(InputError, match="both with different values"): # fmt: off cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--slug", "some-slug", # deprecated flag "--create-origin", "some-other-slug", # conflictual value, so raise "--format", "json", ], catch_exceptions=False, ) # fmt: on def test_cli_deposit_with_server_down_for_maintenance( sample_archive, caplog, client_mock_api_down, slug, patched_tmp_path, cli_runner ): """ Deposit failure due to maintenance down time should be explicit """ # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" down_for_maintenance_log_record = ( "swh.deposit.cli.client", logging.ERROR, "Database backend maintenance: Temporarily unavailable, try again later.", ) assert down_for_maintenance_log_record in caplog.record_tuples client_mock_api_down.service_document.assert_called_once_with() def test_cli_client_generate_metadata_ok(slug): """Generated metadata is well formed and pass service side metadata checks """ actual_metadata_xml = generate_metadata( "deposit-client", "project-name", authors=["some", "authors"], external_id="external-id", create_origin="origin-url", ) actual_metadata = dict(parse_xml(actual_metadata_xml)) assert actual_metadata["atom:author"] == "deposit-client" assert actual_metadata["atom:title"] == "project-name" assert actual_metadata["atom:updated"] is not None assert actual_metadata["codemeta:name"] == "project-name" assert actual_metadata["codemeta:identifier"] == "external-id" assert actual_metadata["codemeta:author"] == [ OrderedDict([("codemeta:name", "some")]), OrderedDict([("codemeta:name", "authors")]), ] assert ( actual_metadata["swh:deposit"]["swh:create_origin"]["swh:origin"]["@url"] == "origin-url" ) checks_ok, detail = check_metadata(actual_metadata) assert checks_ok is True - assert detail is None + # FIXME: Open the flag to suggest the provenance metadata url in the cli + assert detail == { + "metadata": [ + {"summary": SUGGESTED_FIELDS_MISSING, "fields": [METADATA_PROVENANCE_KEY]} + ] + } def test_cli_client_generate_metadata_ok2(slug): """Generated metadata is well formed and pass service side metadata checks """ actual_metadata_xml = generate_metadata( "deposit-client", "project-name", authors=["some", "authors"], ) actual_metadata = dict(parse_xml(actual_metadata_xml)) assert actual_metadata["atom:author"] == "deposit-client" assert actual_metadata["atom:title"] == "project-name" assert actual_metadata["atom:updated"] is not None assert actual_metadata["codemeta:name"] == "project-name" assert actual_metadata["codemeta:author"] == [ OrderedDict([("codemeta:name", "some")]), OrderedDict([("codemeta:name", "authors")]), ] assert actual_metadata.get("codemeta:identifier") is None assert actual_metadata.get("swh:deposit") is None checks_ok, detail = check_metadata(actual_metadata) assert checks_ok is True - assert detail is None + # FIXME: Open the flag to suggest the provenance metadata url in the cli + assert detail == { + "metadata": [ + {"summary": SUGGESTED_FIELDS_MISSING, "fields": [METADATA_PROVENANCE_KEY]} + ] + } def test_cli_single_minimal_deposit_with_slug( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner, caplog, ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--slug", slug, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: actual_metadata = dict(parse_xml(fd.read())) assert actual_metadata["atom:author"] == TEST_USER["username"] assert actual_metadata["codemeta:name"] == "test-project" assert actual_metadata["atom:title"] == "test-project" assert actual_metadata["atom:updated"] is not None assert actual_metadata["codemeta:identifier"] == slug assert actual_metadata["codemeta:author"] == OrderedDict( [("codemeta:name", "Jane Doe")] ) count_warnings = 0 for (_, log_level, _) in caplog.record_tuples: count_warnings += 1 if log_level == logging.WARNING else 0 assert ( count_warnings == 1 ), "We should have 1 warning as we are using slug instead of create_origin" def test_cli_single_minimal_deposit_with_create_origin( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner, caplog, ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") origin = slug # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--create-origin", origin, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: actual_metadata = dict(parse_xml(fd.read())) assert actual_metadata["atom:author"] == TEST_USER["username"] assert actual_metadata["codemeta:name"] == "test-project" assert actual_metadata["atom:title"] == "test-project" assert actual_metadata["atom:updated"] is not None assert ( actual_metadata["swh:deposit"]["swh:create_origin"]["swh:origin"]["@url"] == origin ) assert actual_metadata["codemeta:author"] == OrderedDict( [("codemeta:name", "Jane Doe")] ) count_warnings = 0 for (_, log_level, _) in caplog.record_tuples: count_warnings += 1 if log_level == logging.WARNING else 0 assert ( count_warnings == 0 ), "We should have no warning as we are using create_origin" def test_cli_validation_metadata( sample_archive, caplog, patched_tmp_path, cli_runner, slug ): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") with open(metadata_path, "a"): pass # creates the file for flag_title_or_name, author_or_name in [ ("--author", "no one"), ("--name", "test-project"), ]: # Test missing author then missing name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, flag_title_or_name, author_or_name, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags: Test both --metadata and --author, then --metadata and # --name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--deposit-id", 666, "--archive", sample_archive["path"], "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags check (Test both --metadata and --author, # then --metadata and --name) # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--metadata", metadata_path, "--author", "Jane Doe", "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Using --metadata flag is incompatible with --author " "and --name and --create-origin (those are used to generate " "one metadata file)." ), ) assert expected_error_log_record in caplog.record_tuples caplog.clear() def test_cli_validation_no_actionable_command(caplog, cli_runner): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ # no actionable command # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--partial", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Please provide an actionable command. See --help for more information" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_validation_replace_with_no_deposit_id_fails( sample_archive, caplog, patched_tmp_path, requests_mock_datadir, datadir, cli_runner ): """--replace flags require --deposit-id otherwise fails """ metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--archive", sample_archive["path"], "--replace", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "To update an existing deposit, you must provide its id" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_single_deposit_slug_generation( sample_archive, patched_tmp_path, requests_mock_datadir, cli_runner ): """Single deposit scenario without providing the slug, it should not be generated. """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: metadata_xml = fd.read() actual_metadata = dict(parse_xml(metadata_xml)) assert "codemeta:identifier" not in actual_metadata def test_cli_multisteps_deposit( sample_archive, datadir, slug, requests_mock_datadir, cli_runner ): """ First deposit a partial deposit (no metadata, only archive), then update the metadata part. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit """ # noqa api_url = "https://deposit.test.metadata/1" deposit_id = 666 # Create a partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, "--format", "json", "--partial", ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" actual_deposit = json.loads(result.output) assert actual_deposit == { "deposit_id": str(deposit_id), "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } # Update the partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--deposit-id", deposit_id, "--slug", slug, "--format", "json", "--partial", # in-progress: True, because remains the metadata to upload ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) assert actual_deposit["deposit_status"] == "partial" # Update the partial deposit with only some metadata (and then finalize it) # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # Update deposit with metadata # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--deposit-id", deposit_id, "--slug", slug, "--format", "json", ], # this time, ^ we no longer flag it to partial, so the status changes to # in-progress false ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) # FIXME: should be "deposited" but current limitation in the # requests_mock_datadir_visits use, cannot find a way to make it work right now assert actual_deposit["deposit_status"] == "partial" @pytest.mark.parametrize( "output_format,parser_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_status_with_output_format( output_format, parser_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.status" deposit_id = 1033 expected_deposit_status = { "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", } # fmt: off result = cli_runner.invoke( cli, [ "status", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--deposit-id", deposit_id, "--format", output_format, ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = parser_fn(result_output) assert actual_deposit == expected_deposit_status def test_cli_update_metadata_with_swhid_on_completed_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata on a completed deposit (status done) is ok """ api_url_basename = "deposit.test.updateswhid" deposit_id = 123 expected_deposit_status = { "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa } assert expected_deposit_status["deposit_status"] == "done" assert expected_deposit_status["deposit_swh_id"] is not None # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", expected_deposit_status["deposit_swh_id"], "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status def test_cli_update_metadata_with_swhid_on_other_status_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata with swhid on other deposit status is not possible """ api_url_basename = "deposit.test.updateswhid" deposit_id = "321" # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_result = json.loads(result.output) assert "error" in actual_result assert actual_result == { "error": "You can only update metadata on deposit with status 'done'", "detail": f"The deposit {deposit_id} has status 'partial'", "deposit_status": "partial", "deposit_id": deposit_id, } def test_cli_metadata_only_deposit_full_metadata_file( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli The metadata file posted by the client already contains the swhid """ api_url_basename = "deposit.test.metadataonly" swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" metadata = atom_dataset["entry-data-with-swhid"].format(swhid=swhid) metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") with open(metadata_path, "w") as m: m.write(metadata) expected_deposit_status = { "deposit_id": "100", "deposit_status": "done", "deposit_date": "2020-10-08T13:52:34.509655", } assert expected_deposit_status["deposit_status"] == "done" # fmt: off result = cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status def test_cli_metadata_only_deposit_invalid_swhid( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" invalid_swhid = "ssh:2:sth:xxx" metadata = atom_dataset["entry-data-with-swhid"].format(swhid=invalid_swhid) metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(ValidationError, match="Invalid"): # fmt: off cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], catch_exceptions=False, ) # fmt: on def test_cli_metadata_only_deposit_no_swhid( datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, ): """Post metadata-only deposit through cli with invalid swhid raises """ api_url_basename = "deposit.test.metadataonly" metadata = atom_dataset["entry-data-minimal"] metadata_path = os.path.join(tmp_path, "entry-data-minimal.xml") with open(metadata_path, "w") as f: f.write(metadata) with pytest.raises(InputError, match="SWHID must be provided"): # fmt: off cli_runner.invoke( cli, [ "metadata-only", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--format", "json", ], catch_exceptions=False, ) # fmt: on @pytest.mark.parametrize( "metadata_entry_key", ["entry-data-with-add-to-origin", "entry-only-create-origin"] ) def test_cli_deposit_warning_missing_origin( sample_archive, metadata_entry_key, tmp_path, atom_dataset, caplog, cli_runner, requests_mock_datadir, ): """Deposit cli should log warning when the provided metadata xml is missing origins """ # For the next deposit, no warning should be logged as either or # are provided metadata_raw = atom_dataset[metadata_entry_key] % "some-url" metadata_path = os.path.join(tmp_path, "metadata-with-origin-tag-to-deposit.xml") with open(metadata_path, "w") as f: f.write(metadata_raw) # fmt: off cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, ], ) # fmt: on for (_, log_level, _) in caplog.record_tuples: # all messages are info or below messages so everything is fine assert log_level < logging.WARNING def test_cli_failure_should_be_parseable(atom_dataset, mocker): summary = "Cannot load metadata" verbose_description = ( "Cannot load metadata on swh:1:dir:0eda267e7d3c2e37b3f6a78e542b16190ac4574e, " "this directory object does not exist in the archive (yet?)." ) error_xml = atom_dataset["error-cli"].format( summary=summary, verboseDescription=verbose_description ) api_call = BaseDepositClient(url="https://somewhere.org/") actual_error = api_call.parse_result_error(error_xml) assert actual_error == { "summary": summary, "detail": "", "sword:verboseDescription": verbose_description, } def test_cli_service_document_failure(atom_dataset, mocker): """Ensure service document failures are properly served """ summary = "Invalid user credentials" error_xml = atom_dataset["error-cli"].format(summary=summary, verboseDescription="") api_call = ServiceDocumentDepositClient(url="https://somewhere.org/") actual_error = api_call.parse_result_error(error_xml) assert actual_error == {"error": summary} @pytest.mark.parametrize( "output_format,parser_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_collection_list( output_format, parser_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.list" expected_deposits = { "count": "3", "deposits": [ { "external_id": "check-deposit-2020-10-09T13:10:00.000000", "id": "1031", "status": "rejected", "status_detail": "Deposit without archive", }, { "external_id": "check-deposit-2020-10-10T13:20:00.000000", "id": "1032", "status": "rejected", "status_detail": "Deposit without archive", }, { "complete_date": "2020-10-08T13:52:34.509655", "external_id": "check-deposit-2020-10-08T13:52:34.509655", "id": "1033", "reception_date": "2020-10-08T13:50:30", "status": "done", "status_detail": "The deposit has been successfully loaded into " "the Software Heritage archive", "swhid": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "swhid_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa }, ], } # fmt: off result = cli_runner.invoke( cli, [ "list", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--page", 1, "--page-size", 10, "--format", output_format, ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = parser_fn(result_output) assert actual_deposit == expected_deposits