diff --git a/swh/deposit/cli/client.py b/swh/deposit/cli/client.py index a969b858..bf5cfc3f 100644 --- a/swh/deposit/cli/client.py +++ b/swh/deposit/cli/client.py @@ -1,488 +1,538 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from contextlib import contextmanager from datetime import datetime, timezone import logging # WARNING: do not import unnecessary things here to keep cli startup time under # control import os import sys from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional import warnings import click from swh.deposit.cli import deposit logger = logging.getLogger(__name__) if TYPE_CHECKING: from swh.deposit.client import PublicApiDepositClient class InputError(ValueError): """Input script error """ pass @contextmanager def trap_and_report_exceptions(): """Trap and report exceptions (InputError, MaintenanceError) in a unified way. """ from swh.deposit.client import MaintenanceError try: yield except InputError as e: logger.error("Problem during parsing options: %s", e) sys.exit(1) except MaintenanceError as e: logger.error(e) sys.exit(1) def generate_slug() -> str: """Generate a slug (sample purposes). """ import uuid return str(uuid.uuid4()) def _url(url: str) -> str: """Force the /1 api version at the end of the url (avoiding confusing issues without it). Args: url (str): api url used by cli users Returns: Top level api url to actually request """ if not url.endswith("/1"): url = "%s/1" % url return url def generate_metadata( deposit_client: str, name: str, external_id: Optional[str], authors: List[str] ) -> str: """Generate sword compliant xml metadata with the minimum required metadata. The Atom spec, https://tools.ietf.org/html/rfc4287, says that: - atom:entry elements MUST contain one or more atom:author elements - atom:entry elements MUST contain exactly one atom:title element. - atom:entry elements MUST contain exactly one atom:updated element. However, we are also using CodeMeta, so we want some basic information to be mandatory. Therefore, we generate the following mandatory fields: - http://www.w3.org/2005/Atom#updated - http://www.w3.org/2005/Atom#author - http://www.w3.org/2005/Atom#title - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#name (yes, in addition to http://www.w3.org/2005/Atom#title, even if they have somewhat the same meaning) - https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#author Args: deposit_client: Deposit client username, name: Software name external_id: External identifier (slug) or generated one authors: List of author names Returns: metadata xml string """ import xmltodict # generate a metadata file with the minimum required metadata document = { "atom:entry": { "@xmlns:atom": "http://www.w3.org/2005/Atom", "@xmlns:codemeta": "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0", "atom:updated": datetime.now(tz=timezone.utc), # mandatory, cf. docstring "atom:author": deposit_client, # mandatory, cf. docstring "atom:title": name, # mandatory, cf. docstring "codemeta:name": name, # mandatory, cf. docstring "codemeta:author": [ # mandatory, cf. docstring {"codemeta:name": author_name} for author_name in authors ], }, } if external_id: document["atom:entry"]["codemeta:identifier"] = external_id logging.debug("Atom entry dict to generate as xml: %s", document) return xmltodict.unparse(document, pretty=True) def _collection(client: PublicApiDepositClient) -> str: """Retrieve the client's collection """ # retrieve user's collection sd_content = client.service_document() if "error" in sd_content: raise InputError("Service document retrieval: %s" % (sd_content["error"],)) collection = sd_content["app:service"]["app:workspace"]["app:collection"][ "sword:name" ] return collection def client_command_parse_input( client, username: str, archive: Optional[str], metadata: Optional[str], collection: Optional[str], slug: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, name: Optional[str], authors: List[str], temp_dir: str, ) -> Dict[str, Any]: """Parse the client subcommand options and make sure the combination is acceptable*. If not, an InputError exception is raised explaining the issue. By acceptable, we mean: - A multipart deposit (create or update) requires: - an existing software archive - an existing metadata file or author(s) and name provided in params - A binary deposit (create/update) requires an existing software archive - A metadata deposit (create/update) requires an existing metadata file or author(s) and name provided in params - A deposit update requires a deposit_id This will not prevent all failure cases though. The remaining errors are already dealt with by the underlying api client. Raises: InputError explaining the user input related issue MaintenanceError explaining the api status Returns: dict with the following keys: "archive": the software archive to deposit "username": username "metadata": the metadata file to deposit "collection": the user's collection under which to put the deposit "slug": the slug or external id identifying the deposit to make "in_progress": if the deposit is partial or not "url": deposit's server main entry point "deposit_id": optional deposit identifier "swhid": optional deposit swhid "replace": whether the given deposit is to be replaced or not """ if not metadata: if name and authors: metadata_path = os.path.join(temp_dir, "metadata.xml") logging.debug("Temporary file: %s", metadata_path) metadata_xml = generate_metadata(username, name, slug, authors) logging.debug("Metadata xml generated: %s", metadata_xml) with open(metadata_path, "w") as f: f.write(metadata_xml) metadata = metadata_path elif archive is not None and not partial and not deposit_id: # If we meet all the following conditions: # * this is not an archive-only deposit request # * it is not part of a multipart deposit (either create/update # or finish) # * it misses either name or authors raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ) elif name or authors: # If we are generating metadata, then all mandatory metadata # must be present raise InputError( "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ) else: # TODO: this is a multipart deposit, we might want to check that # metadata are deposited at some point pass elif name or authors: raise InputError( "Using --metadata flag is incompatible with both " "--author and --name (Those are used to generate one metadata file)." ) if not archive and not metadata: raise InputError( "Please provide an actionable command. See --help for more information" ) if replace and not deposit_id: raise InputError("To update an existing deposit, you must provide its id") if not collection: collection = _collection(client) return { "archive": archive, "username": username, "metadata": metadata, "collection": collection, "slug": slug, "in_progress": partial, "url": url, "deposit_id": deposit_id, "swhid": swhid, "replace": replace, } def _subdict(d: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]: "return a dict from d with only given keys" return {k: v for k, v in d.items() if k in keys} @deposit.command() @click.option("--username", required=True, help="(Mandatory) User's name") @click.option( "--password", required=True, help="(Mandatory) User's associated password" ) @click.option( "--archive", type=click.Path(exists=True), help="(Optional) Software archive to deposit", ) @click.option( "--metadata", type=click.Path(exists=True), help=( "(Optional) Path to xml metadata file. If not provided, " "this will use a file named .metadata.xml" ), ) # noqa @click.option( "--archive-deposit/--no-archive-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--metadata-deposit/--no-metadata-deposit", default=False, help="Deprecated (ignored)", ) @click.option( "--collection", help="(Optional) User's collection. If not provided, this will be fetched.", ) # noqa @click.option( "--slug", help=( "(Optional) External system information identifier. " "If not provided, it will be generated" ), ) # noqa @click.option( "--partial/--no-partial", default=False, help=( "(Optional) The deposit will be partial, other deposits " "will have to take place to finalize it." ), ) # noqa @click.option( "--deposit-id", default=None, help="(Optional) Update an existing partial deposit with its identifier", ) # noqa @click.option( "--swhid", default=None, help="(Optional) Update existing completed deposit (status done) with new metadata", ) @click.option( "--replace/--no-replace", default=False, help="(Optional) Update by replacing existing metadata to a deposit", ) # noqa @click.option( "--url", default="https://deposit.softwareheritage.org", help=( "(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1" ), ) # noqa @click.option("--verbose/--no-verbose", default=False, help="Verbose mode") @click.option("--name", help="Software name") @click.option( "--author", multiple=True, help="Software author(s), this can be repeated as many times" " as there are authors", ) @click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", ) @click.pass_context def upload( ctx, username: str, password: str, archive: Optional[str], metadata: Optional[str], archive_deposit: bool, metadata_deposit: bool, collection: Optional[str], slug: Optional[str], partial: bool, deposit_id: Optional[int], swhid: Optional[str], replace: bool, url: str, verbose: bool, name: Optional[str], author: List[str], output_format: Optional[str], ): """Software Heritage Public Deposit Client Create/Update deposit through the command line. More documentation can be found at https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html. """ import tempfile from swh.deposit.client import PublicApiDepositClient if archive_deposit or metadata_deposit: warnings.warn( '"archive_deposit" and "metadata_deposit" option arguments are ' "deprecated and have no effect; simply do not provide the archive " "for a metadata-only deposit, and do not provide a metadata for a" "archive-only deposit.", DeprecationWarning, ) url = _url(url) client = PublicApiDepositClient(url=url, auth=(username, password)) with tempfile.TemporaryDirectory() as temp_dir: with trap_and_report_exceptions(): logger.debug("Parsing cli options") config = client_command_parse_input( client, username, archive, metadata, collection, slug, partial, deposit_id, swhid, replace, url, name, author, temp_dir, ) if verbose: logger.info("Parsed configuration: %s", config) keys = ["archive", "collection", "in_progress", "metadata", "slug"] if config["deposit_id"]: keys += ["deposit_id", "replace", "swhid"] data = client.deposit_update(**_subdict(config, keys)) else: data = client.deposit_create(**_subdict(config, keys)) print_result(data, output_format) @deposit.command() @click.option( "--url", default="https://deposit.softwareheritage.org", help="(Optional) Deposit server api endpoint. By default, " "https://deposit.softwareheritage.org/1", ) @click.option("--username", required=True, help="(Mandatory) User's name") @click.option( "--password", required=True, help="(Mandatory) User's associated password" ) @click.option("--deposit-id", default=None, required=True, help="Deposit identifier.") @click.option( "-f", "--format", "output_format", default="logging", type=click.Choice(["logging", "yaml", "json"]), help="Output format results.", ) @click.pass_context def status(ctx, url, username, password, deposit_id, output_format): """Deposit's status """ from swh.deposit.client import PublicApiDepositClient url = _url(url) logger.debug("Status deposit") with trap_and_report_exceptions(): client = PublicApiDepositClient(url=url, auth=(username, password)) collection = _collection(client) print_result( client.deposit_status(collection=collection, deposit_id=deposit_id), output_format, ) def print_result(data: Dict[str, Any], output_format: Optional[str]) -> None: """Display the result data into a dedicated output format. """ import json import yaml if output_format == "json": click.echo(json.dumps(data)) elif output_format == "yaml": click.echo(yaml.dump(data)) else: logger.info(data) + + +@deposit.command("metadata-only") +@click.option( + "--url", + default="https://deposit.softwareheritage.org", + help="(Optional) Deposit server api endpoint. By default, " + "https://deposit.softwareheritage.org/1", +) +@click.option("--username", required=True, help="(Mandatory) User's name") +@click.option( + "--password", required=True, help="(Mandatory) User's associated password" +) +@click.option( + "--metadata", + "metadata_path", + type=click.Path(exists=True), + required=True, + help="Path to xml metadata file", +) +@click.option( + "-f", + "--format", + "output_format", + default="logging", + type=click.Choice(["logging", "yaml", "json"]), + help="Output format results.", +) +@click.pass_context +def metadata_only(ctx, url, username, password, metadata_path, output_format): + """Deposit metadata only upload + + """ + from swh.deposit.client import PublicApiDepositClient + from swh.deposit.utils import parse_swh_reference, parse_xml + + # Parse to check for a swhid presence within the metadata file + with open(metadata_path, "r") as f: + metadata_raw = f.read() + actual_swhid = parse_swh_reference(parse_xml(metadata_raw)) + + if not actual_swhid: + raise InputError("A SWHID must be provided for a metadata-only deposit") + + with trap_and_report_exceptions(): + client = PublicApiDepositClient(url=_url(url), auth=(username, password)) + collection = _collection(client) + result = client.deposit_metadata_only(collection, metadata_path) + + print_result(result, output_format) diff --git a/swh/deposit/client.py b/swh/deposit/client.py index 62237a0d..b72c8e2a 100644 --- a/swh/deposit/client.py +++ b/swh/deposit/client.py @@ -1,714 +1,744 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining an swh-deposit client """ from abc import ABCMeta, abstractmethod import hashlib import logging import os from typing import Any, Dict, Optional, Tuple from urllib.parse import urljoin import warnings import requests from swh.core.config import load_from_envvar from swh.deposit import __version__ as swh_deposit_version from swh.deposit.utils import parse_xml logger = logging.getLogger(__name__) def compute_unified_information( collection: str, in_progress: bool, slug: str, *, filepath: Optional[str] = None, swhid: Optional[str] = None, **kwargs, ) -> Dict[str, Any]: """Given a filepath, compute necessary information on that file. Args: collection: Deposit collection in_progress: do we finalize the deposit? slug: external id to use filepath: Path to the file to compute the necessary information out of swhid: Deposit swhid if any Returns: dict with keys: 'slug': external id to use 'in_progress': do we finalize the deposit? 'content-type': content type associated 'md5sum': md5 sum 'filename': filename 'filepath': filepath 'swhid': deposit swhid """ result: Dict[str, Any] = { "slug": slug, "in_progress": in_progress, "swhid": swhid, } content_type: Optional[str] = None md5sum: Optional[str] = None if filepath: filename = os.path.basename(filepath) md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest() extension = filename.split(".")[-1] if "zip" in extension: content_type = "application/zip" else: content_type = "application/x-tar" result.update( { "content-type": content_type, "md5sum": md5sum, "filename": filename, "filepath": filepath, } ) return result class MaintenanceError(ValueError): """Informational maintenance error exception """ pass def handle_deprecated_config(config: Dict) -> Tuple[str, Optional[Tuple[str, str]]]: warnings.warn( '"config" argument is deprecated, please ' 'use "url" and "auth" arguments instead; note that "auth" ' "expects now a couple (username, password) and not a dict.", DeprecationWarning, ) url: str = config["url"] auth: Optional[Tuple[str, str]] = None if config.get("auth"): auth = (config["auth"]["username"], config["auth"]["password"]) return (url, auth) class BaseApiDepositClient: """Deposit client base class """ def __init__( self, config: Optional[Dict] = None, url: Optional[str] = None, auth: Optional[Tuple[str, str]] = None, ): if not url and not config: config = load_from_envvar() if config: url, auth = handle_deprecated_config(config) # needed to help mypy not be fooled by the Optional nature of url assert url is not None self.base_url = url.strip("/") + "/" self.auth = auth self.session = requests.Session() if auth: self.session.auth = auth self.session.headers.update( {"user-agent": f"swh-deposit/{swh_deposit_version}"} ) def do(self, method, url, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in self._methods' keys Returns: The request's execution """ full_url = urljoin(self.base_url, url.lstrip("/")) return self.session.request(method, full_url, *args, **kwargs) class PrivateApiDepositClient(BaseApiDepositClient): """Private API deposit client to: - read a given deposit's archive(s) - read a given deposit's metadata - update a given deposit's status """ def archive_get(self, archive_update_url, archive): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = self.do("get", archive_update_url, stream=True) if r.ok: with open(archive, "wb") as f: for chunk in r.iter_content(): f.write(chunk) return archive msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,) logger.error(msg) raise ValueError(msg) def metadata_get(self, metadata_url): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = self.do("get", metadata_url) if r.ok: return r.json() msg = "Problem when retrieving metadata at %s" % metadata_url logger.error(msg) raise ValueError(msg) def status_update( self, update_status_url, status, revision_id=None, directory_id=None, origin_url=None, ): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to directory_id (str/None): the directory's identifier to update to origin_url (str/None): deposit's associated origin url """ payload = {"status": status} if revision_id: payload["revision_id"] = revision_id if directory_id: payload["directory_id"] = directory_id if origin_url: payload["origin_url"] = origin_url self.do("put", update_status_url, json=payload) def check(self, check_url): """Check the deposit's associated data (metadata, archive(s)) Args: check_url (str): the full deposit's check url """ r = self.do("get", check_url) if r.ok: data = r.json() return data["status"] msg = "Problem when checking deposit %s" % check_url logger.error(msg) raise ValueError(msg) class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta): """Base Deposit client to access the public api. """ def __init__( self, config=None, url=None, auth=None, error_msg=None, empty_result={} ): super().__init__(url=url, auth=auth, config=config) self.error_msg = error_msg self.empty_result = empty_result @abstractmethod def compute_url(self, *args, **kwargs): """Compute api url endpoint to query.""" pass @abstractmethod def compute_method(self, *args, **kwargs): """Http method to use on the url""" pass @abstractmethod def parse_result_ok(self, xml_content): """Given an xml result from the api endpoint, parse it and returns a dict. """ pass def compute_information(self, *args, **kwargs) -> Dict[str, Any]: """Compute some more information given the inputs (e.g http headers, ...) """ return {} def parse_result_error(self, xml_content): """Given an error response in xml, parse it into a dict. Returns: dict with following keys: 'error': The error message 'detail': Some more detail about the error if any """ data = parse_xml(xml_content) return { "summary": data["summary"], "detail": data["detail"], "sword:verboseDescription": data["sword:verboseDescription"], } def do_execute(self, method, url, info): """Execute the http query to url using method and info information. By default, execute a simple query to url with the http method. Override this in daughter class to improve the default behavior if needed. """ return self.do(method, url) def execute(self, *args, **kwargs) -> Dict[str, Any]: """Main endpoint to prepare and execute the http query to the api. Raises: MaintenanceError if some api maintenance is happening. Returns: Dict of computed api data """ url = self.compute_url(*args, **kwargs) method = self.compute_method(*args, **kwargs) info = self.compute_information(*args, **kwargs) try: r = self.do_execute(method, url, info) except Exception as e: msg = self.error_msg % (url, e) r = self.empty_result r.update( {"error": msg,} ) return r else: if r.ok: if int(r.status_code) == 204: # 204 returns no body return {"status": r.status_code} else: return self.parse_result_ok(r.text) else: error = self.parse_result_error(r.text) empty = self.empty_result error.update(empty) if r.status_code == 503: summary = error.get("summary") detail = error.get("sword:verboseDescription") # Maintenance error if summary and detail: raise MaintenanceError(f"{summary}: {detail}") error.update( {"status": r.status_code,} ) return error class ServiceDocumentDepositClient(BaseDepositClient): """Service Document information retrieval. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Service document failure at %s: %s", empty_result={"collection": None}, ) def compute_url(self, *args, **kwargs): return "/servicedocument/" def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Parse service document's success response. """ return parse_xml(xml_content) class StatusDepositClient(BaseDepositClient): """Status information on a deposit. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Status check failure at %s: %s", empty_result={ "deposit_status": None, "deposit_status_detail": None, "deposit_swh_id": None, }, ) def compute_url(self, collection, deposit_id): return "/%s/%s/status/" % (collection, deposit_id) def compute_method(self, *args, **kwargs): return "get" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ data = parse_xml(xml_content) keys = [ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_swh_id", "deposit_swh_id_context", "deposit_external_id", ] return {key: data.get("swh:" + key) for key in keys} class BaseCreateDepositClient(BaseDepositClient): """Deposit client base class to post new deposit. """ def __init__(self, config=None, url=None, auth=None): super().__init__( url=url, auth=auth, config=config, error_msg="Post Deposit failure at %s: %s", empty_result={"swh:deposit_id": None, "swh:deposit_status": None,}, ) def compute_url(self, collection, *args, **kwargs): return "/%s/" % collection def compute_method(self, *args, **kwargs): return "post" def parse_result_ok(self, xml_content): """Given an xml content as string, returns a deposit dict. """ data = parse_xml(xml_content) keys = [ "deposit_id", "deposit_status", "deposit_status_detail", "deposit_date", ] return {key: data.get("swh:" + key) for key in keys} def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]: return info def do_execute(self, method, url, info): with open(info["filepath"], "rb") as f: return self.do(method, url, data=f, headers=info["headers"]) class CreateArchiveDepositClient(BaseCreateDepositClient): """Post an archive (binary) deposit client.""" def compute_headers(self, info): headers = { "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": info["content-type"], "CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],), } if "slug" in info: headers["SLUG"] = info["slug"] return headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information( *args, filepath=kwargs["archive_path"], **kwargs ) info["headers"] = self.compute_headers(info) return info class UpdateArchiveDepositClient(CreateArchiveDepositClient): """Update (add/replace) an archive (binary) deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/media/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class CreateMetadataDepositClient(BaseCreateDepositClient): """Post a metadata deposit client.""" def compute_headers(self, info): headers = { "IN-PROGRESS": str(info["in_progress"]), "CONTENT-TYPE": "application/atom+xml;type=entry", } if "slug" in info: headers["SLUG"] = info["slug"] return headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information( *args, filepath=kwargs["metadata_path"], **kwargs ) info["headers"] = self.compute_headers(info) return info class UpdateMetadataOnPartialDepositClient(CreateMetadataDepositClient): """Update (add/replace) metadata on partial deposit scenario.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return f"/{collection}/{deposit_id}/metadata/" def compute_method(self, *args, replace: bool = False, **kwargs) -> str: return "put" if replace else "post" class UpdateMetadataOnDoneDepositClient(UpdateMetadataOnPartialDepositClient): """Update metadata on "done" deposit. This requires the deposit swhid.""" def compute_headers(self, info: Dict[str, Any]) -> Dict[str, Any]: return { "CONTENT-TYPE": "application/atom+xml;type=entry", "X_CHECK_SWHID": info["swhid"], } def compute_method(self, *args, **kwargs) -> str: return "put" +class CreateMetadataOnlyDepositClient(BaseCreateDepositClient): + """Create metadata-only deposit.""" + + def compute_information(self, *args, **kwargs) -> Dict[str, Any]: + return { + "headers": {"CONTENT-TYPE": "application/atom+xml;type=entry",}, + "filepath": kwargs["metadata_path"], + } + + def parse_result_ok(self, xml_content): + """Given an xml content as string, returns a deposit dict. + + """ + data = parse_xml(xml_content) + keys = [ + "deposit_id", + "deposit_status", + "deposit_date", + ] + return {key: data.get("swh:" + key) for key in keys} + + class CreateMultipartDepositClient(BaseCreateDepositClient): """Create a multipart deposit client.""" def _multipart_info(self, info, info_meta): files = [ ( "file", (info["filename"], open(info["filepath"], "rb"), info["content-type"]), ), ( "atom", ( info_meta["filename"], open(info_meta["filepath"], "rb"), "application/atom+xml", ), ), ] headers = { "CONTENT_MD5": info["md5sum"], "IN-PROGRESS": str(info["in_progress"]), } if "slug" in info: headers["SLUG"] = info["slug"] return files, headers def compute_information(self, *args, **kwargs) -> Dict[str, Any]: info = compute_unified_information(*args, filepath=kwargs["archive_path"],) info_meta = compute_unified_information( *args, filepath=kwargs["metadata_path"], ) files, headers = self._multipart_info(info, info_meta) return {"files": files, "headers": headers} def do_execute(self, method, url, info): return self.do(method, url, files=info["files"], headers=info["headers"]) class UpdateMultipartDepositClient(CreateMultipartDepositClient): """Update a multipart deposit client.""" def compute_url(self, collection, *args, deposit_id=None, **kwargs): return "/%s/%s/metadata/" % (collection, deposit_id) def compute_method(self, *args, replace=False, **kwargs): return "put" if replace else "post" class PublicApiDepositClient(BaseApiDepositClient): """Public api deposit client.""" def service_document(self): """Retrieve service document endpoint's information.""" return ServiceDocumentDepositClient(url=self.base_url, auth=self.auth).execute() def deposit_status(self, collection: str, deposit_id: int): """Retrieve status information on a deposit.""" return StatusDepositClient(url=self.base_url, auth=self.auth).execute( collection, deposit_id ) def deposit_create( self, collection: str, slug: Optional[str], archive: Optional[str] = None, metadata: Optional[str] = None, in_progress: bool = False, ): """Create a new deposit (archive, metadata, both as multipart).""" if archive and not metadata: return CreateArchiveDepositClient( url=self.base_url, auth=self.auth ).execute(collection, in_progress, slug, archive_path=archive) elif not archive and metadata: return CreateMetadataDepositClient( url=self.base_url, auth=self.auth ).execute(collection, in_progress, slug, metadata_path=metadata) else: return CreateMultipartDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, archive_path=archive, metadata_path=metadata, ) def deposit_update( self, collection: str, deposit_id: int, slug: Optional[str], archive: Optional[str] = None, metadata: Optional[str] = None, in_progress: bool = False, replace: bool = False, swhid: Optional[str] = None, ): """Update (add/replace) existing deposit (archive, metadata, both).""" r = self.deposit_status(collection, deposit_id) if "error" in r: return r status = r["deposit_status"] if swhid is None and status != "partial": return { "error": "You can only act on deposit with status 'partial'", "detail": f"The deposit {deposit_id} has status '{status}'", "deposit_status": status, "deposit_id": deposit_id, } if swhid is not None and status != "done": return { "error": "You can only update metadata on deposit with status 'done'", "detail": f"The deposit {deposit_id} has status '{status}'", "deposit_status": status, "deposit_id": deposit_id, } if archive and not metadata: r = UpdateArchiveDepositClient(url=self.base_url, auth=self.auth).execute( collection, in_progress, slug, deposit_id=deposit_id, archive_path=archive, replace=replace, ) elif not archive and metadata and swhid is None: r = UpdateMetadataOnPartialDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, metadata_path=metadata, replace=replace, ) elif not archive and metadata and swhid is not None: r = UpdateMetadataOnDoneDepositClient( url=self.base_url, auth=self.auth ).execute( collection, in_progress, slug, deposit_id=deposit_id, metadata_path=metadata, swhid=swhid, ) else: r = UpdateMultipartDepositClient(url=self.base_url, auth=self.auth).execute( collection, in_progress, slug, deposit_id=deposit_id, archive_path=archive, metadata_path=metadata, replace=replace, ) if "error" in r: return r return self.deposit_status(collection, deposit_id) + + def deposit_metadata_only( + self, collection: str, metadata: Optional[str] = None, + ): + assert metadata is not None + return CreateMetadataOnlyDepositClient( + url=self.base_url, auth=self.auth + ).execute(collection, metadata_path=metadata) diff --git a/swh/deposit/tests/cli/test_client.py b/swh/deposit/tests/cli/test_client.py index d765c442..89d59498 100644 --- a/swh/deposit/tests/cli/test_client.py +++ b/swh/deposit/tests/cli/test_client.py @@ -1,639 +1,740 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast from collections import OrderedDict import contextlib import json import logging import os from unittest.mock import MagicMock import pytest import yaml from swh.deposit.api.checks import check_metadata from swh.deposit.cli import deposit as cli from swh.deposit.cli.client import ( InputError, _collection, _url, generate_metadata, generate_slug, ) from swh.deposit.client import MaintenanceError, PublicApiDepositClient from swh.deposit.parsers import parse_xml +from swh.model.exceptions import ValidationError from ..conftest import TEST_USER @pytest.fixture def datadir(request): """Override default datadir to target main test datadir""" return os.path.join(os.path.dirname(str(request.fspath)), "../data") @pytest.fixture def slug(): return generate_slug() @pytest.fixture def patched_tmp_path(tmp_path, mocker): mocker.patch( "tempfile.TemporaryDirectory", return_value=contextlib.nullcontext(str(tmp_path)), ) return tmp_path @pytest.fixture def client_mock_api_down(mocker, slug): """A mock client whose connection with api fails due to maintenance issue """ mock_client = MagicMock() mocker.patch("swh.deposit.client.PublicApiDepositClient", return_value=mock_client) mock_client.service_document.side_effect = MaintenanceError( "Database backend maintenance: Temporarily unavailable, try again later." ) return mock_client def test_cli_url(): assert _url("http://deposit") == "http://deposit/1" assert _url("https://other/1") == "https://other/1" def test_cli_collection_error(): mock_client = MagicMock() mock_client.service_document.return_value = {"error": "something went wrong"} with pytest.raises(InputError) as e: _collection(mock_client) assert "Service document retrieval: something went wrong" == str(e.value) def test_cli_collection_ok(requests_mock_datadir): client = PublicApiDepositClient( url="https://deposit.swh.test/1", auth=("test", "test") ) collection_name = _collection(client) assert collection_name == "test" def test_cli_collection_ko_because_downtime(): mock_client = MagicMock() mock_client.service_document.side_effect = MaintenanceError("downtime") with pytest.raises(MaintenanceError, match="downtime"): _collection(mock_client) def test_cli_deposit_with_server_down_for_maintenance( sample_archive, caplog, client_mock_api_down, slug, patched_tmp_path, cli_runner ): """ Deposit failure due to maintenance down time should be explicit """ # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" down_for_maintenance_log_record = ( "swh.deposit.cli.client", logging.ERROR, "Database backend maintenance: Temporarily unavailable, try again later.", ) assert down_for_maintenance_log_record in caplog.record_tuples client_mock_api_down.service_document.assert_called_once_with() def test_cli_client_generate_metadata_ok(slug): """Generated metadata is well formed and pass service side metadata checks """ actual_metadata_xml = generate_metadata( "deposit-client", "project-name", "external-id", authors=["some", "authors"] ) actual_metadata = dict(parse_xml(actual_metadata_xml)) assert actual_metadata["atom:author"] == "deposit-client" assert actual_metadata["atom:title"] == "project-name" assert actual_metadata["atom:updated"] is not None assert actual_metadata["codemeta:name"] == "project-name" assert actual_metadata["codemeta:identifier"] == "external-id" assert actual_metadata["codemeta:author"] == [ OrderedDict([("codemeta:name", "some")]), OrderedDict([("codemeta:name", "authors")]), ] checks_ok, detail = check_metadata(actual_metadata) assert checks_ok is True assert detail is None def test_cli_single_minimal_deposit( sample_archive, slug, patched_tmp_path, requests_mock_datadir, cli_runner ): """ This ensure a single deposit upload through the cli is fine, cf. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#single-deposit """ # noqa metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--slug", slug, "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: actual_metadata = dict(parse_xml(fd.read())) assert actual_metadata["atom:author"] == TEST_USER["username"] assert actual_metadata["codemeta:name"] == "test-project" assert actual_metadata["atom:title"] == "test-project" assert actual_metadata["atom:updated"] is not None assert actual_metadata["codemeta:identifier"] == slug assert actual_metadata["codemeta:author"] == OrderedDict( [("codemeta:name", "Jane Doe")] ) def test_cli_validation_metadata( sample_archive, caplog, patched_tmp_path, cli_runner, slug ): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") with open(metadata_path, "a"): pass # creates the file for flag_title_or_name, author_or_name in [ ("--author", "no one"), ("--name", "test-project"), ]: # Test missing author then missing name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, flag_title_or_name, author_or_name, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided. " ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags: Test both --metadata and --author, then --metadata and # --name # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--deposit-id", 666, "--archive", sample_archive["path"], "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, f"unexpected result: {result.output}" assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "For metadata deposit request, either a metadata file with " "--metadata or both --author and --name must be provided." ), ) assert expected_error_log_record in caplog.record_tuples # Clear mocking state caplog.clear() # incompatible flags check (Test both --metadata and --author, # then --metadata and --name) # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--metadata", metadata_path, "--author", "Jane Doe", "--slug", slug, ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Using --metadata flag is incompatible with both " "--author and --name (Those are used to generate one metadata file)." ), ) assert expected_error_log_record in caplog.record_tuples caplog.clear() def test_cli_validation_no_actionable_command(caplog, cli_runner): """Multiple metadata flags scenario (missing, conflicts) properly fails the calls """ # no actionable command # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--partial", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "Please provide an actionable command. See --help for more information" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_validation_replace_with_no_deposit_id_fails( sample_archive, caplog, patched_tmp_path, requests_mock_datadir, datadir, cli_runner ): """--replace flags require --deposit-id otherwise fails """ metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--archive", sample_archive["path"], "--replace", ], ) # fmt: on assert result.exit_code == 1, result.output assert result.output == "" expected_error_log_record = ( "swh.deposit.cli.client", logging.ERROR, ( "Problem during parsing options: " "To update an existing deposit, you must provide its id" ), ) assert expected_error_log_record in caplog.record_tuples def test_cli_single_deposit_slug_generation( sample_archive, patched_tmp_path, requests_mock_datadir, cli_runner ): """Single deposit scenario without providing the slug, it should not be generated. """ metadata_path = os.path.join(patched_tmp_path, "metadata.xml") # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", "https://deposit.swh.test/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--archive", sample_archive["path"], "--author", "Jane Doe", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output assert json.loads(result.output) == { "deposit_id": "615", "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } with open(metadata_path) as fd: metadata_xml = fd.read() actual_metadata = dict(parse_xml(metadata_xml)) assert "codemeta:identifier" not in actual_metadata def test_cli_multisteps_deposit( sample_archive, datadir, slug, requests_mock_datadir, cli_runner ): """ First deposit a partial deposit (no metadata, only archive), then update the metadata part. https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#multisteps-deposit """ # noqa api_url = "https://deposit.test.metadata/1" deposit_id = 666 # Create a partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--slug", slug, "--format", "json", "--partial", ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" actual_deposit = json.loads(result.output) assert actual_deposit == { "deposit_id": str(deposit_id), "deposit_status": "partial", "deposit_status_detail": None, "deposit_date": "Oct. 8, 2020, 4:57 p.m.", } # Update the partial deposit with only 1 archive # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--archive", sample_archive["path"], "--deposit-id", deposit_id, "--slug", slug, "--format", "json", "--partial", # in-progress: True, because remains the metadata to upload ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) assert actual_deposit["deposit_status"] == "partial" # Update the partial deposit with only some metadata (and then finalize it) # https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html#add-content-or-metadata-to-the-deposit metadata_path = os.path.join(datadir, "atom", "entry-data-deposit-binary.xml") # Update deposit with metadata # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", api_url, "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--metadata", metadata_path, "--deposit-id", deposit_id, "--slug", slug, "--format", "json", ], # this time, ^ we no longer flag it to partial, so the status changes to # in-progress false ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" assert result.output is not None actual_deposit = json.loads(result.output) # deposit update scenario actually returns a deposit status dict assert actual_deposit["deposit_id"] == str(deposit_id) # FIXME: should be "deposited" but current limitation in the # requests_mock_datadir_visits use, cannot find a way to make it work right now assert actual_deposit["deposit_status"] == "partial" @pytest.mark.parametrize( "output_format,callable_fn", [ ("json", json.loads), ("yaml", yaml.safe_load), ( "logging", ast.literal_eval, ), # not enough though, the caplog fixture is needed ], ) def test_cli_deposit_status_with_output_format( output_format, callable_fn, datadir, slug, requests_mock_datadir, caplog, cli_runner ): """Check deposit status cli with all possible output formats (json, yaml, logging). """ api_url_basename = "deposit.test.status" deposit_id = 1033 expected_deposit_status = { "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", } # fmt: off result = cli_runner.invoke( cli, [ "status", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--deposit-id", deposit_id, "--format", output_format, ], ) # fmt: on assert result.exit_code == 0, f"unexpected output: {result.output}" if output_format == "logging": assert len(caplog.record_tuples) == 1 # format: (, , ) _, _, result_output = caplog.record_tuples[0] else: result_output = result.output actual_deposit = callable_fn(result_output) assert actual_deposit == expected_deposit_status def test_cli_update_metadata_with_swhid_on_completed_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata on a completed deposit (status done) is ok """ api_url_basename = "deposit.test.updateswhid" deposit_id = 123 expected_deposit_status = { "deposit_external_id": "check-deposit-2020-10-08T13:52:34.509655", "deposit_id": str(deposit_id), "deposit_status": "done", "deposit_status_detail": ( "The deposit has been successfully loaded into the " "Software Heritage archive" ), "deposit_swh_id": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "deposit_swh_id_context": "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/", # noqa } assert expected_deposit_status["deposit_status"] == "done" assert expected_deposit_status["deposit_swh_id"] is not None # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", expected_deposit_status["deposit_swh_id"], "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_deposit_status = json.loads(result.output) assert "error" not in actual_deposit_status assert actual_deposit_status == expected_deposit_status def test_cli_update_metadata_with_swhid_on_other_status_deposit( datadir, requests_mock_datadir, cli_runner ): """Update new metadata with swhid on other deposit status is not possible """ api_url_basename = "deposit.test.updateswhid" deposit_id = 321 # fmt: off result = cli_runner.invoke( cli, [ "upload", "--url", f"https://{api_url_basename}/1", "--username", TEST_USER["username"], "--password", TEST_USER["password"], "--name", "test-project", "--author", "John Doe", "--deposit-id", deposit_id, "--swhid", "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea", "--format", "json", ], ) # fmt: on assert result.exit_code == 0, result.output actual_result = json.loads(result.output) assert "error" in actual_result assert actual_result == { "error": "You can only update metadata on deposit with status 'done'", "detail": "The deposit 321 has status 'partial'", "deposit_status": "partial", "deposit_id": 321, } + + +def test_cli_metadata_only_deposit_full_metadata_file( + datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, +): + """Post metadata-only deposit through cli + + The metadata file posted by the client already contains the swhid + + """ + api_url_basename = "deposit.test.metadataonly" + swhid = "swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea" + metadata = atom_dataset["entry-data-with-swhid"].format(swhid=swhid) + metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") + with open(metadata_path, "w") as m: + m.write(metadata) + + expected_deposit_status = { + "deposit_id": "100", + "deposit_status": "done", + "deposit_date": "2020-10-08T13:52:34.509655", + } + + assert expected_deposit_status["deposit_status"] == "done" + + # fmt: off + result = cli_runner.invoke( + cli, + [ + "metadata-only", + "--url", f"https://{api_url_basename}/1", + "--username", TEST_USER["username"], + "--password", TEST_USER["password"], + "--metadata", metadata_path, + "--format", "json", + ], + ) + # fmt: on + assert result.exit_code == 0, result.output + actual_deposit_status = json.loads(result.output) + assert "error" not in actual_deposit_status + assert actual_deposit_status == expected_deposit_status + + +def test_cli_metadata_only_deposit_invalid_swhid( + datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, +): + """Post metadata-only deposit through cli with invalid swhid raises + + """ + api_url_basename = "deposit.test.metadataonly" + invalid_swhid = "ssh:2:sth:xxx" + metadata = atom_dataset["entry-data-with-swhid"].format(swhid=invalid_swhid) + metadata_path = os.path.join(tmp_path, "entry-data-with-swhid.xml") + with open(metadata_path, "w") as f: + f.write(metadata) + + # fmt: off + with pytest.raises(ValidationError, match="Invalid"): + cli_runner.invoke( + cli, + [ + "metadata-only", + "--url", f"https://{api_url_basename}/1", + "--username", TEST_USER["username"], + "--password", TEST_USER["password"], + "--metadata", metadata_path, + "--format", "json", + ], + catch_exceptions=False, + ) + # fmt: on + + +def test_cli_metadata_only_deposit_no_swhid( + datadir, requests_mock_datadir, cli_runner, atom_dataset, tmp_path, +): + """Post metadata-only deposit through cli with invalid swhid raises + + """ + api_url_basename = "deposit.test.metadataonly" + metadata = atom_dataset["entry-data-minimal"] + metadata_path = os.path.join(tmp_path, "entry-data-minimal.xml") + with open(metadata_path, "w") as f: + f.write(metadata) + + with pytest.raises(InputError, match="SWHID must be provided"): + cli_runner.invoke( + cli, + [ + "metadata-only", + "--url", f"https://{api_url_basename}/1", + "--username", TEST_USER["username"], + "--password", TEST_USER["password"], + "--metadata", metadata_path, + "--format", "json", + ], + catch_exceptions=False, + ) + # fmt: on diff --git a/swh/deposit/tests/data/https_deposit.test.metadataonly/1_servicedocument b/swh/deposit/tests/data/https_deposit.test.metadataonly/1_servicedocument new file mode 100644 index 00000000..f520d56c --- /dev/null +++ b/swh/deposit/tests/data/https_deposit.test.metadataonly/1_servicedocument @@ -0,0 +1,26 @@ + + + + 2.0 + 209715200 + + + The Software Heritage (SWH) Archive + + test Software Collection + application/zip + application/x-tar + Collection Policy + Software Heritage Archive + Collect, Preserve, Share + false + false + http://purl.org/net/sword/package/SimpleZip + https://deposit.test.metadataonly/1/test/ + test + + + diff --git a/swh/deposit/tests/data/https_deposit.test.metadataonly/1_test b/swh/deposit/tests/data/https_deposit.test.metadataonly/1_test new file mode 100644 index 00000000..2ef2a26d --- /dev/null +++ b/swh/deposit/tests/data/https_deposit.test.metadataonly/1_test @@ -0,0 +1,12 @@ + + + 100 + 2020-10-08T13:52:34.509655 + done + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea + swh:1:dir:ef04a768181417fbc5eef4243e2507915f24deea;origin=https://www.softwareheritage.org/check-deposit-2020-10-08T13:52:34.509655;visit=swh:1:snp:c477c6ef51833127b13a86ece7d75e5b3cc4e93d;anchor=swh:1:rev:f26f3960c175f15f6e24200171d446b86f6f7230;path=/ + +