diff --git a/PKG-INFO b/PKG-INFO index 31e9bbd..ad6ab5d 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,48 +1,48 @@ Metadata-Version: 2.1 Name: swh.web.client -Version: 0.2.5 +Version: 0.3.0 Summary: Software Heritage Web client Home-page: https://forge.softwareheritage.org/source/swh-web-client/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-web-client Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-web-client/ Description: Software Heritage - Web client ============================== Client for Software Heritage Web applications, via their APIs. Sample usage ------------ .. code-block:: python from swh.web.client.client import WebAPIClient cli = WebAPIClient() # retrieve any archived object via its SWHID cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # same, but for specific object types cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # get() always retrieve entire objects, following pagination # WARNING: this might *not* be what you want for large objects cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') # type-specific methods support explicit iteration through pages next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing diff --git a/debian/changelog b/debian/changelog index 39dc174..4e1b069 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,64 +1,68 @@ -swh-web-client (0.2.5-1~swh1~bpo10+1) buster-swh; urgency=medium +swh-web-client (0.3.0-1~swh1) unstable-swh; urgency=medium - * Rebuild for buster-swh + * New upstream release 0.3.0 - (tagged by Antoine R. Dumont + (@ardumont) on 2021-04-21 11:36:52 + +0200) + * Upstream changes: - v0.3.0 - Add new `swh save submit- + request` cli to batch save code now requests - -- Software Heritage autobuilder (on jenkins-debian1) Thu, 04 Mar 2021 17:23:27 +0000 + -- Software Heritage autobuilder (on jenkins-debian1) Wed, 21 Apr 2021 09:38:18 +0000 swh-web-client (0.2.5-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.5 - (tagged by Nicolas Dandrimont on 2021-03-04 18:20:23 +0100) * Upstream changes: - Release swh.web.client 0.2.5 - Compatibility with swh.model 1.0.1 -- Software Heritage autobuilder (on jenkins-debian1) Thu, 04 Mar 2021 17:22:34 +0000 swh-web-client (0.2.4-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.4 - (tagged by Stefano Zacchiroli on 2020-12-18 17:13:44 +0100) * Upstream changes: - v0.2.4 / 2020-12-18 - * docs: index: update shell examples with recent CLI changes - * cli: add -- config-file option to 'swh web' group - * cli: move 'swh auth' group to 'swh web auth' - * cli: add 'swh web search' subcommand - * client: add origin_search() method -- Software Heritage autobuilder (on jenkins-debian1) Fri, 18 Dec 2020 16:15:21 +0000 swh-web-client (0.2.3-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.3 - (tagged by Nicolas Dandrimont on 2020-12-03 15:33:13 +0100) * Upstream changes: - Release swh.web.client 0.2.3 - Add an origin_exists method -- Software Heritage autobuilder (on jenkins-debian1) Thu, 03 Dec 2020 14:35:00 +0000 swh-web-client (0.2.2-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.2 - (tagged by Stefano Zacchiroli on 2020-11-09 20:37:26 +0100) * Upstream changes: - v0.2.2 / 2020-11-09 - * client: bind /known API endpoint to verify for object presence - * cli: Use more explicit naming for auth subcommands - * client: support optional date in Revision and Release -- Software Heritage autobuilder (on jenkins-debian1) Mon, 09 Nov 2020 19:38:57 +0000 swh-web-client (0.2.1-1~swh1) unstable-swh; urgency=medium * New upstream release 0.2.1 - (tagged by Stefano Zacchiroli on 2020-10-06 17:00:10 +0200) * Upstream changes: - v0.2.1 / 2020-10-06 - * client: make API response typify-ing optional - * tox.ini: pin black to the pre- commit version (19.10b0) to avoid flip-flops - * Remove stale Debian packaging from the master branch - * Run isort after the CLI import changes -- Software Heritage autobuilder (on jenkins-debian1) Tue, 06 Oct 2020 15:02:31 +0000 swh-web-client (0.2.0-1~swh1) unstable-swh; urgency=low * Bootstrap debian packaging for swh-web-client -- Nicolas Dandrimont Fri, 25 Sep 2020 19:35:20 +0200 diff --git a/swh.web.client.egg-info/PKG-INFO b/swh.web.client.egg-info/PKG-INFO index 31e9bbd..ad6ab5d 100644 --- a/swh.web.client.egg-info/PKG-INFO +++ b/swh.web.client.egg-info/PKG-INFO @@ -1,48 +1,48 @@ Metadata-Version: 2.1 Name: swh.web.client -Version: 0.2.5 +Version: 0.3.0 Summary: Software Heritage Web client Home-page: https://forge.softwareheritage.org/source/swh-web-client/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-web-client Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-web-client/ Description: Software Heritage - Web client ============================== Client for Software Heritage Web applications, via their APIs. Sample usage ------------ .. code-block:: python from swh.web.client.client import WebAPIClient cli = WebAPIClient() # retrieve any archived object via its SWHID cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # same, but for specific object types cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # get() always retrieve entire objects, following pagination # WARNING: this might *not* be what you want for large objects cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') # type-specific methods support explicit iteration through pages next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing diff --git a/swh.web.client.egg-info/SOURCES.txt b/swh.web.client.egg-info/SOURCES.txt index c69ffff..6059115 100644 --- a/swh.web.client.egg-info/SOURCES.txt +++ b/swh.web.client.egg-info/SOURCES.txt @@ -1,45 +1,46 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.rst api_data.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/README.rst docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.web.client.egg-info/PKG-INFO swh.web.client.egg-info/SOURCES.txt swh.web.client.egg-info/dependency_links.txt swh.web.client.egg-info/entry_points.txt swh.web.client.egg-info/requires.txt swh.web.client.egg-info/top_level.txt swh/web/__init__.py swh/web/client/__init__.py swh/web/client/auth.py swh/web/client/cli.py swh/web/client/client.py swh/web/client/py.typed swh/web/client/tests/__init__.py swh/web/client/tests/api_data.py +swh/web/client/tests/api_data_static.py swh/web/client/tests/conftest.py swh/web/client/tests/gen-api-data.sh swh/web/client/tests/test_cli.py swh/web/client/tests/test_web_api_client.py \ No newline at end of file diff --git a/swh/web/client/cli.py b/swh/web/client/cli.py index dc6d739..29c9186 100644 --- a/swh/web/client/cli.py +++ b/swh/web/client/cli.py @@ -1,230 +1,282 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict, List # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from click.core import Context from swh.core.cli import swh as swh_cli_group CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) # TODO (T1410): All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) DEFAULT_CONFIG: Dict[str, Any] = { "api_url": "https://archive.softwareheritage.org/api/1", "bearer_token": None, } @swh_cli_group.group(name="web", context_settings=CONTEXT_SETTINGS) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help=f"Configuration file (default: {DEFAULT_CONFIG_PATH})", ) @click.pass_context def web(ctx: Context, config_file: str): """Software Heritage web client""" import logging from swh.core import config from swh.web.client.client import WebAPIClient if not config_file: config_file = DEFAULT_CONFIG_PATH try: conf = config.read_raw_config(config.config_basepath(config_file)) if not conf: raise ValueError(f"Cannot parse configuration file: {config_file}") + # TODO: Determine what the following conditional is for if config_file == DEFAULT_CONFIG_PATH: try: conf = conf["swh"]["web"]["client"] except KeyError: pass # recursive merge not done by config.read conf = config.merge_configs(DEFAULT_CONFIG, conf) except Exception: logging.warning( "Using default configuration (cannot load custom one)", exc_info=True ) conf = DEFAULT_CONFIG ctx.ensure_object(dict) ctx.obj["client"] = WebAPIClient(conf["api_url"], conf["bearer_token"]) @web.command(name="search") @click.argument( "query", required=True, nargs=-1, metavar="KEYWORD...", ) @click.option( "--limit", "limit", type=int, default=10, show_default=True, help="maximum number of results to show", ) @click.option( "--only-visited", is_flag=True, show_default=True, help="if true, only return origins with at least one visit by Software heritage", ) @click.option( "--url-encode/--no-url-encode", default=False, show_default=True, help="if true, escape origin URLs in results with percent encoding (RFC 3986)", ) @click.pass_context def search( ctx: Context, query: List[str], limit: int, only_visited: bool, url_encode: bool, ): """Search a query (as a list of keywords) into the Software Heritage archive. The search results are printed to CSV format, one result per line, using a tabulation as the field delimiter. """ import logging import sys import urllib.parse import requests client = ctx.obj["client"] keywords = " ".join(query) try: results = client.origin_search(keywords, limit, only_visited) for result in results: if url_encode: result["url"] = urllib.parse.quote_plus(result["url"]) print("\t".join(result.values())) except requests.HTTPError as err: logging.error("Could not retrieve search results: %s", err) except (BrokenPipeError, IOError): # Get rid of the BrokenPipeError message sys.stderr.close() +@web.group(name="save", context_settings=CONTEXT_SETTINGS) +@click.pass_context +def savecodenow(ctx: Context,): + """Subcommand to interact from the cli with the save code now feature + + """ + pass + + +@savecodenow.command("submit-request") +@click.option("--delimiter", "-d", default=",") +@click.pass_context +def submit_request(ctx, delimiter: str) -> None: + """Submit new save code now request through cli pipe. The expected format of the request + if one csv row ``,``. + + Example: + + cat list-origins | swh web save submit-request + + echo svn;https://svn-url\ngit;https://git-url | swh web save \ + submit-request --delimiter ';' + + Prints: + The output of save code now requests as json output. + + """ + import json + import logging + import sys + + logging.basicConfig(level=logging.INFO, stream=sys.stderr) + + client = ctx.obj["client"] + + processed_origins = [] + for origin in sys.stdin: + visit_type, origin = origin.rstrip().split(delimiter) + + try: + saved_origin = client.origin_save(visit_type, origin) + logging.info("Submitted origin (%s, %s)", visit_type, origin) + processed_origins.append(saved_origin) + except Exception as e: + logging.warning( + "Issue for origin (%s, %s)\n%s", origin, visit_type, e, + ) + logging.debug("Origin saved: %s", len(processed_origins)) + print(json.dumps(processed_origins)) + + @web.group(name="auth", context_settings=CONTEXT_SETTINGS) @click.option( "--oidc-server-url", "oidc_server_url", default="https://auth.softwareheritage.org/auth/", help=( "URL of OpenID Connect server (default to " '"https://auth.softwareheritage.org/auth/")' ), ) @click.option( "--realm-name", "realm_name", default="SoftwareHeritage", help=( "Name of the OpenID Connect authentication realm " '(default to "SoftwareHeritage")' ), ) @click.option( "--client-id", "client_id", default="swh-web", help=("OpenID Connect client identifier in the realm " '(default to "swh-web")'), ) @click.pass_context def auth(ctx: Context, oidc_server_url: str, realm_name: str, client_id: str): """ Authenticate Software Heritage users with OpenID Connect. This CLI tool eases the retrieval of a bearer token to authenticate a user querying the Software Heritage Web API. """ from swh.web.client.auth import OpenIDConnectSession ctx.ensure_object(dict) ctx.obj["oidc_session"] = OpenIDConnectSession( oidc_server_url, realm_name, client_id ) @auth.command("generate-token") @click.argument("username") @click.pass_context def generate_token(ctx: Context, username: str): """ Generate a new bearer token for Web API authentication. Login with USERNAME, create a new OpenID Connect session and get bearer token. User will be prompted for his password and token will be printed to standard output. The created OpenID Connect session is an offline one so the provided token has a much longer expiration time than classical OIDC sessions (usually several dozens of days). """ from getpass import getpass password = getpass() oidc_info = ctx.obj["oidc_session"].login(username, password) if "refresh_token" in oidc_info: print(oidc_info["refresh_token"]) else: print(oidc_info) @auth.command("login", deprecated=True) @click.argument("username") @click.pass_context def login(ctx: Context, username: str): """ Alias for 'generate-token' """ ctx.forward(generate_token) @auth.command("revoke-token") @click.argument("token") @click.pass_context def revoke_token(ctx: Context, token: str): """ Revoke a bearer token used for Web API authentication. Use TOKEN to logout from an offline OpenID Connect session. The token is definitely revoked after that operation. """ ctx.obj["oidc_session"].logout(token) print("Token successfully revoked.") @auth.command("logout", deprecated=True) @click.argument("token") @click.pass_context def logout(ctx: Context, token: str): """ Alias for 'revoke-token' """ ctx.forward(revoke_token) diff --git a/swh/web/client/client.py b/swh/web/client/client.py index 9a1559d..ea669e1 100644 --- a/swh/web/client/client.py +++ b/swh/web/client/client.py @@ -1,610 +1,628 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Python client for the Software Heritage Web API Light wrapper around requests for the archive API, taking care of data conversions and pagination. .. code-block:: python from swh.web.client.client import WebAPIClient cli = WebAPIClient() # retrieve any archived object via its SWHID cli.get('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # same, but for specific object types cli.revision('swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6') # get() always retrieve entire objects, following pagination # WARNING: this might *not* be what you want for large objects cli.get('swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a') # type-specific methods support explicit iteration through pages next(cli.snapshot('swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764')) """ from datetime import datetime from typing import Any, Callable, Dict, Iterator, List, Optional, Union from urllib.parse import urlparse import dateutil.parser import requests from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import CoreSWHID, ObjectType from swh.web.client.cli import DEFAULT_CONFIG SWHIDish = Union[CoreSWHID, str] CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ORIGIN_VISIT = "origin_visit" ORIGIN = "origin" def _get_object_id_hex(swhidish: SWHIDish) -> str: """Parse string or SWHID and return the hex value of the object_id""" if isinstance(swhidish, str): swhid = CoreSWHID.from_string(swhidish) else: swhid = swhidish return hash_to_hex(swhid.object_id) def typify_json(data: Any, obj_type: str) -> Any: """Type API responses using pythonic types where appropriate The following conversions are performed: - identifiers are converted from strings to SWHID instances - timestamps are converted from strings to datetime.datetime objects """ def to_swhid(object_type: Union[str, ObjectType], s: Any) -> CoreSWHID: if isinstance(object_type, str): parsed_object_type = ObjectType[object_type.upper()] else: parsed_object_type = object_type return CoreSWHID(object_type=parsed_object_type, object_id=hash_to_bytes(s)) def to_date(date: str) -> datetime: return dateutil.parser.parse(date) def to_optional_date(date: Optional[str]) -> Optional[datetime]: return None if date is None else to_date(date) # The date attribute is optional for Revision and Release object def obj_type_of_entry_type(s): if s == "file": return ObjectType.CONTENT elif s == "dir": return ObjectType.DIRECTORY elif s == "rev": return ObjectType.REVISION else: raise ValueError(f"invalid directory entry type: {s}") if obj_type == SNAPSHOT: for name, target in data.items(): if target["target_type"] != "alias": # alias targets do not point to objects via SWHIDs; others do target["target"] = to_swhid(target["target_type"], target["target"]) elif obj_type == REVISION: data["id"] = to_swhid(obj_type, data["id"]) data["directory"] = to_swhid(DIRECTORY, data["directory"]) for key in ("date", "committer_date"): data[key] = to_optional_date(data[key]) for parent in data["parents"]: parent["id"] = to_swhid(REVISION, parent["id"]) elif obj_type == RELEASE: data["id"] = to_swhid(obj_type, data["id"]) data["date"] = to_optional_date(data["date"]) data["target"] = to_swhid(data["target_type"], data["target"]) elif obj_type == DIRECTORY: dir_swhid = None for entry in data: dir_swhid = dir_swhid or to_swhid(obj_type, entry["dir_id"]) entry["dir_id"] = dir_swhid entry["target"] = to_swhid( obj_type_of_entry_type(entry["type"]), entry["target"] ) elif obj_type == CONTENT: pass # nothing to do for contents elif obj_type == ORIGIN_VISIT: data["date"] = to_date(data["date"]) if data["snapshot"] is not None: data["snapshot"] = to_swhid("snapshot", data["snapshot"]) else: raise ValueError(f"invalid object type: {obj_type}") return data class WebAPIClient: """Client for the Software Heritage archive Web API, see https://archive.softwareheritage.org/api/ """ def __init__( self, api_url: str = DEFAULT_CONFIG["api_url"], bearer_token: Optional[str] = DEFAULT_CONFIG["bearer_token"], ): """Create a client for the Software Heritage Web API See: https://archive.softwareheritage.org/api/ Args: api_url: base URL for API calls bearer_token: optional bearer token to do authenticated API calls """ api_url = api_url.rstrip("/") u = urlparse(api_url) self.api_url = api_url self.api_path = u.path self.bearer_token = bearer_token self._getters: Dict[ObjectType, Callable[[SWHIDish, bool], Any]] = { ObjectType.CONTENT: self.content, ObjectType.DIRECTORY: self.directory, ObjectType.RELEASE: self.release, ObjectType.REVISION: self.revision, ObjectType.SNAPSHOT: self._get_snapshot, } def _call( self, query: str, http_method: str = "get", **req_args ) -> requests.models.Response: """Dispatcher for archive API invocation Args: query: API method to be invoked, rooted at api_url http_method: HTTP method to be invoked, one of: 'get', 'head' req_args: extra keyword arguments for requests.get()/.head() Raises: requests.HTTPError: if HTTP request fails and http_method is 'get' """ url = None if urlparse(query).scheme: # absolute URL url = query else: # relative URL; prepend base API URL url = "/".join([self.api_url, query]) r = None headers = {} if self.bearer_token is not None: headers = {"Authorization": f"Bearer {self.bearer_token}"} if http_method == "get": r = requests.get(url, **req_args, headers=headers) r.raise_for_status() elif http_method == "post": r = requests.post(url, **req_args, headers=headers) r.raise_for_status() elif http_method == "head": r = requests.head(url, **req_args, headers=headers) else: raise ValueError(f"unsupported HTTP method: {http_method}") return r def _get_snapshot(self, swhid: SWHIDish, typify: bool = True) -> Dict[str, Any]: """Analogous to self.snapshot(), but zipping through partial snapshots, merging them together before returning """ snapshot = {} for snp in self.snapshot(swhid, typify): snapshot.update(snp) return snapshot def get(self, swhid: SWHIDish, typify: bool = True, **req_args) -> Any: """Retrieve information about an object of any kind Dispatcher method over the more specific methods content(), directory(), etc. Note that this method will buffer the entire output in case of long, iterable output (e.g., for snapshot()), see the iter() method for streaming. """ if isinstance(swhid, str): obj_type = CoreSWHID.from_string(swhid).object_type else: obj_type = swhid.object_type return self._getters[obj_type](swhid, typify) def iter( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Iterator[Dict[str, Any]]: """Stream over the information about an object of any kind Streaming variant of get() """ if isinstance(swhid, str): obj_type = CoreSWHID.from_string(swhid).object_type else: obj_type = swhid.object_type if obj_type == SNAPSHOT: yield from self.snapshot(swhid, typify) elif obj_type == REVISION: yield from [self.revision(swhid, typify)] elif obj_type == RELEASE: yield from [self.release(swhid, typify)] elif obj_type == DIRECTORY: yield from self.directory(swhid, typify) elif obj_type == CONTENT: yield from [self.content(swhid, typify)] else: raise ValueError(f"invalid object type: {obj_type}") def content( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a content object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/", **req_args ).json() return typify_json(json, CONTENT) if typify else json def directory( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> List[Dict[str, Any]]: """Retrieve information about a directory object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"directory/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, DIRECTORY) if typify else json def revision( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a revision object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"revision/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, REVISION) if typify else json def release( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Dict[str, Any]: """Retrieve information about a release object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ json = self._call(f"release/{_get_object_id_hex(swhid)}/", **req_args).json() return typify_json(json, RELEASE) if typify else json def snapshot( self, swhid: SWHIDish, typify: bool = True, **req_args ) -> Iterator[Dict[str, Any]]: """Retrieve information about a snapshot object Args: swhid: object persistent identifier typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Returns: an iterator over partial snapshots (dictionaries mapping branch names to information about where they point to), each containing a subset of available branches Raises: requests.HTTPError: if HTTP request fails """ done = False r = None query = f"snapshot/{_get_object_id_hex(swhid)}/" while not done: r = self._call(query, http_method="get", **req_args) json = r.json()["branches"] yield typify_json(json, SNAPSHOT) if typify else json if "next" in r.links and "url" in r.links["next"]: query = r.links["next"]["url"] else: done = True def visits( self, origin: str, per_page: Optional[int] = None, last_visit: Optional[int] = None, typify: bool = True, **req_args, ) -> Iterator[Dict[str, Any]]: """List visits of an origin Args: origin: the URL of a software origin per_page: the number of visits to list last_visit: visit to start listing from typify: if True, convert return value to pythonic types wherever possible, otherwise return raw JSON types (default: True) req_args: extra keyword arguments for requests.get() Returns: an iterator over visits of the origin Raises: requests.HTTPError: if HTTP request fails """ done = False r = None params = [] if last_visit is not None: params.append(("last_visit", last_visit)) if per_page is not None: params.append(("per_page", per_page)) query = f"origin/{origin}/visits/" while not done: r = self._call(query, http_method="get", params=params, **req_args) yield from [typify_json(v, ORIGIN_VISIT) if typify else v for v in r.json()] if "next" in r.links and "url" in r.links["next"]: params = [] query = r.links["next"]["url"] else: done = True def known( self, swhids: Iterator[SWHIDish], **req_args ) -> Dict[CoreSWHID, Dict[Any, Any]]: """Verify the presence in the archive of several objects at once Args: swhids: SWHIDs of the objects to verify Returns: a dictionary mapping object SWHIDs to archive information about them; the dictionary includes a "known" key associated to a boolean value that is true if and only if the object is known to the archive Raises: requests.HTTPError: if HTTP request fails """ r = self._call( "known/", http_method="post", json=list(map(str, swhids)), **req_args ) return {CoreSWHID.from_string(k): v for k, v in r.json().items()} def content_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a content object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) ) def directory_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a directory object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"directory/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) ) def revision_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a revision object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"revision/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) ) def release_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a release object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"release/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) ) def snapshot_exists(self, swhid: SWHIDish, **req_args) -> bool: """Check if a snapshot object exists in the archive Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call( f"snapshot/{_get_object_id_hex(swhid)}/", http_method="head", **req_args, ) ) def origin_exists(self, origin: str, **req_args) -> bool: """Check if an origin object exists in the archive Args: origin: the URL of a software origin req_args: extra keyword arguments for requests.head() Raises: requests.HTTPError: if HTTP request fails """ return bool( self._call(f"origin/{origin}/get/", http_method="head", **req_args,) ) def content_raw(self, swhid: SWHIDish, **req_args) -> Iterator[bytes]: """Iterate over the raw content of a content object Args: swhid: object persistent identifier req_args: extra keyword arguments for requests.get() Raises: requests.HTTPError: if HTTP request fails """ r = self._call( f"content/sha1_git:{_get_object_id_hex(swhid)}/raw/", stream=True, **req_args, ) r.raise_for_status() yield from r.iter_content(chunk_size=None, decode_unicode=False) def origin_search( self, query: str, limit: Optional[int] = None, with_visit: bool = False, **req_args, ) -> Iterator[Dict[str, Any]]: """List origin search results Args: query: search keywords limit: the maximum number of found origins to return with_visit: if true, only return origins with at least one visit Returns: an iterator over search results Raises: requests.HTTPError: if HTTP request fails """ params = [] if limit is not None: params.append(("limit", limit)) if with_visit: params.append(("with_visit", True)) done = False nb_returned = 0 q = f"origin/search/{query}/" while not done: r = self._call(q, params=params, **req_args) json = r.json() if limit and nb_returned + len(json) > limit: json = json[: limit - nb_returned] nb_returned += len(json) yield from json if limit and nb_returned == limit: done = True if "next" in r.links and "url" in r.links["next"]: params = [] q = r.links["next"]["url"] else: done = True + + def origin_save(self, visit_type: str, origin: str) -> Dict: + """Save code now query for the origin with visit_type. + + Args: + visit_type: Type of the visit + origin: the origin to save + + Returns: + The resulting dict of the visit saved + + Raises: + requests.HTTPError: if HTTP request fails + + """ + q = f"origin/save/{visit_type}/url/{origin}/" + r = self._call(q, http_method="post") + return r.json() diff --git a/swh/web/client/tests/api_data_static.py b/swh/web/client/tests/api_data_static.py new file mode 100644 index 0000000..965d1dc --- /dev/null +++ b/swh/web/client/tests/api_data_static.py @@ -0,0 +1,38 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +API_DATA_STATIC = { + "origin/save/git/url/https://gitlab.org/gazelle/itest/": r""" + { + "visit_type": "git", + "origin_url": "https://gitlab.org/gazelle/itest", + "save_request_date": "2021-04-20T11:34:38.752929+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": null + } + """, + "origin/save/git/url/https://git.renater.fr/anonscm/git/6po/6po.git/": r""" + { + "visit_type": "git", + "origin_url": "https://git.renater.fr/anonscm/git/6po/6po.git", + "save_request_date": "2021-04-20T11:34:40.115226+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": null + } + """, + "origin/save/git/url/https://github.com/colobot/colobot/": r""" + { + "visit_type": "git", + "origin_url": "https://github.com/colobot/colobot", + "save_request_date": "2021-04-20T11:40:47.667492+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": null + } + """, +} diff --git a/swh/web/client/tests/conftest.py b/swh/web/client/tests/conftest.py index 4c8e831..90e8943 100644 --- a/swh/web/client/tests/conftest.py +++ b/swh/web/client/tests/conftest.py @@ -1,51 +1,83 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os + import pytest +import yaml from swh.web.client.client import WebAPIClient from .api_data import API_DATA, API_URL +from .api_data_static import API_DATA_STATIC @pytest.fixture def web_api_mock(requests_mock): # monkey patch URLs that require a special response headers for api_call, data in API_DATA.items(): headers = {} if api_call == "snapshot/cabcc7d7bf639bbe1cc3b41989e1806618dd5764/": # to make the client init and follow pagination headers = { "Link": f'<{API_URL}/{api_call}?branches_count=1000&branches_from=refs/tags/v3.0-rc7>; rel="next"' # NoQA: E501 } elif ( api_call == "origin/https://github.com/NixOS/nixpkgs/visits/?last_visit=50&per_page=10" # NoQA: E501 ): # to make the client follow pagination headers = { "Link": f'<{API_URL}/origin/https://github.com/NixOS/nixpkgs/visits/?last_visit=40&per_page=10>; rel="next"' # NoQA: E501 } requests_mock.get(f"{API_URL}/{api_call}", text=data, headers=headers) def known_callback(request, context): known_swhids = [ "swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1", "swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6", "swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6", "swh:1:rel:208f61cc7a5dbc9879ae6e5c2f95891e270f09ef", "swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a", ] return {swhid: {"known": swhid in known_swhids} for swhid in request.json()} requests_mock.register_uri("POST", f"{API_URL}/known/", json=known_callback) + # Add some other post urls to mock + for api_call, data in API_DATA_STATIC.items(): + requests_mock.post(f"{API_URL}/{api_call}", text=data) + return requests_mock @pytest.fixture def web_api_client(): # use the fake base API URL that matches API data return WebAPIClient(api_url=API_URL) + + +@pytest.fixture +def cli_global_config_dict(): + """Define a basic configuration yaml for the cli. + + """ + return { + "api_url": API_URL, + "bearer_token": None, + } + + +@pytest.fixture +def cli_config_path(tmp_path, cli_global_config_dict, monkeypatch): + """Write a global.yml file and writes it in the environment + + """ + config_path = os.path.join(tmp_path, "global.yml") + with open(config_path, "w") as f: + f.write(yaml.dump(cli_global_config_dict)) + monkeypatch.setenv("SWH_CONFIG_FILE", config_path) + + return config_path diff --git a/swh/web/client/tests/test_cli.py b/swh/web/client/tests/test_cli.py index 9021cb0..a7bee0f 100644 --- a/swh/web/client/tests/test_cli.py +++ b/swh/web/client/tests/test_cli.py @@ -1,54 +1,115 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import json +import os + from click.testing import CliRunner -from swh.web.client.cli import auth +from swh.web.client.cli import auth, web runner = CliRunner() oidc_profile = { "access_token": "some-access-token", "expires_in": 600, "refresh_expires_in": 0, "refresh_token": "some-refresh-token", "token_type": "bearer", "session_state": "some-state", "scope": "openid email profile offline_access", } def test_auth_generate_token(mocker): mock_getpass = mocker.patch("getpass.getpass") mock_getpass.return_value = "password" mock_oidc_session = mocker.patch("swh.web.client.auth.OpenIDConnectSession") mock_login = mock_oidc_session.return_value.login mock_login.return_value = oidc_profile for command in ("generate-token", "login"): mock_login.side_effect = None result = runner.invoke(auth, [command, "username"], input="password\n") assert result.exit_code == 0 assert oidc_profile["refresh_token"] in result.output mock_login.side_effect = Exception("Auth error") result = runner.invoke(auth, [command, "username"], input="password\n") assert result.exit_code == 1 def test_auth_revoke_token(mocker): mock_oidc_session = mocker.patch("swh.web.client.auth.OpenIDConnectSession") mock_logout = mock_oidc_session.return_value.logout for command in ("revoke-token", "logout"): mock_logout.side_effect = None result = runner.invoke(auth, [command, oidc_profile["refresh_token"]]) assert result.exit_code == 0 mock_logout.side_effect = Exception("Auth error") result = runner.invoke(auth, [command, oidc_profile["refresh_token"]]) assert result.exit_code == 1 + + +def test_save_code_now_through_cli(mocker, web_api_mock, tmp_path, cli_config_path): + """Trigger save code now from the cli creates new save code now requests""" + origins = [ + ("git", "https://gitlab.org/gazelle/itest"), + ("git", "https://git.renater.fr/anonscm/git/6po/6po.git"), + ("git", "https://github.com/colobot/colobot"), + # this will be rejected + ("tig", "invalid-and-refusing-to-save-this"), + ] + origins_csv = "\n".join(map(lambda t: ",".join(t), origins)) + origins_csv = f"{origins_csv}\n" + + temp_file = os.path.join(tmp_path, "tmp.csv") + with open(temp_file, "w") as f: + f.write(origins_csv) + + with open(temp_file, "r") as f: + result = runner.invoke( + web, + ["--config-file", cli_config_path, "save", "submit-request"], + input=f, + catch_exceptions=False, + ) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + actual_save_requests = json.loads(result.output.strip()) + assert len(actual_save_requests) == 3 + + expected_save_requests = [ + { + "origin_url": "https://gitlab.org/gazelle/itest", + "save_request_date": "2021-04-20T11:34:38.752929+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": None, + "visit_type": "git", + }, + { + "origin_url": "https://git.renater.fr/anonscm/git/6po/6po.git", + "save_request_date": "2021-04-20T11:34:40.115226+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": None, + "visit_type": "git", + }, + { + "origin_url": "https://github.com/colobot/colobot", + "save_request_date": "2021-04-20T11:40:47.667492+00:00", + "save_request_status": "accepted", + "save_task_status": "not yet scheduled", + "visit_date": None, + "visit_type": "git", + }, + ] + for actual_save_request in actual_save_requests: + assert actual_save_request in expected_save_requests diff --git a/swh/web/client/tests/test_web_api_client.py b/swh/web/client/tests/test_web_api_client.py index 62c2d26..cd9e335 100644 --- a/swh/web/client/tests/test_web_api_client.py +++ b/swh/web/client/tests/test_web_api_client.py @@ -1,235 +1,253 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from dateutil.parser import parse as parse_date +import pytest from swh.model.identifiers import REVISION, CoreSWHID from swh.web.client.client import typify_json from .api_data import API_DATA def test_get_content(web_api_client, web_api_mock): swhid = CoreSWHID.from_string("swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1") obj = web_api_client.get(swhid) assert obj["length"] == 151810 for key in ("length", "status", "checksums", "data_url"): assert key in obj assert obj["checksums"]["sha1_git"] == str(swhid).split(":")[3] assert obj["checksums"]["sha1"] == "dc2830a9e72f23c1dfebef4413003221baa5fb62" assert obj == web_api_client.content(swhid) def test_get_directory(web_api_client, web_api_mock): swhid = CoreSWHID.from_string("swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6") obj = web_api_client.get(swhid) assert len(obj) == 35 # number of directory entries assert all(map(lambda entry: entry["dir_id"] == swhid, obj)) dir_entry = obj[0] assert dir_entry["type"] == "file" assert dir_entry["target"] == CoreSWHID.from_string( "swh:1:cnt:58471109208922c9ee8c4b06135725f03ed16814" ) assert dir_entry["name"] == ".bzrignore" assert dir_entry["length"] == 582 assert obj == web_api_client.directory(swhid) def test_get_release(web_api_client, web_api_mock): swhid = CoreSWHID.from_string("swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342") obj = web_api_client.get(swhid) assert obj["id"] == swhid assert obj["author"]["fullname"] == "Paul Tagliamonte " assert obj["author"]["name"] == "Paul Tagliamonte" assert obj["date"] == parse_date("2013-07-06T19:34:11-04:00") assert obj["name"] == "0.9.9" assert obj["target_type"] == "revision" assert obj["target"] == CoreSWHID.from_string( "swh:1:rev:e005cb773c769436709ca6a1d625dc784dbc1636" ) assert not obj["synthetic"] assert obj == web_api_client.release(swhid) def test_get_revision(web_api_client, web_api_mock): swhid = CoreSWHID.from_string("swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6") obj = web_api_client.get(swhid) assert obj["id"] == swhid for role in ("author", "committer"): assert ( obj[role]["fullname"] == "Nicolas Dandrimont " ) assert obj[role]["name"] == "Nicolas Dandrimont" timestamp = parse_date("2014-08-18T18:18:25+02:00") assert obj["date"] == timestamp assert obj["committer_date"] == timestamp assert obj["message"].startswith("Merge branch") assert obj["merge"] assert len(obj["parents"]) == 2 assert obj["parents"][0]["id"] == CoreSWHID.from_string( "swh:1:rev:26307d261279861c2d9c9eca3bb38519f951bea4" ) assert obj["parents"][1]["id"] == CoreSWHID.from_string( "swh:1:rev:37fc9e08d0c4b71807a4f1ecb06112e78d91c283" ) assert obj == web_api_client.revision(swhid) def test_get_snapshot(web_api_client, web_api_mock): # small snapshot, the one from Web API doc swhid = CoreSWHID.from_string("swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a") obj = web_api_client.get(swhid) assert len(obj) == 4 assert obj["refs/heads/master"]["target_type"] == "revision" assert obj["refs/heads/master"]["target"] == CoreSWHID.from_string( "swh:1:rev:83c20a6a63a7ebc1a549d367bc07a61b926cecf3" ) assert obj["refs/tags/dpkt-1.7"]["target_type"] == "revision" assert obj["refs/tags/dpkt-1.7"]["target"] == CoreSWHID.from_string( "swh:1:rev:0c9dbfbc0974ec8ac1d8253aa1092366a03633a8" ) def test_iter_snapshot(web_api_client, web_api_mock): # large snapshot from the Linux kernel, usually spanning two pages swhid = CoreSWHID.from_string("swh:1:snp:cabcc7d7bf639bbe1cc3b41989e1806618dd5764") obj = web_api_client.snapshot(swhid) snp = {} for partial in obj: snp.update(partial) assert len(snp) == 1391 def test_authentication(web_api_client, web_api_mock): rel_id = "b9db10d00835e9a43e2eebef2db1d04d4ae82342" url = f"{web_api_client.api_url}/release/{rel_id}/" refresh_token = "user-refresh-token" web_api_client.bearer_token = refresh_token swhid = CoreSWHID.from_string(f"swh:1:rel:{rel_id}") web_api_client.get(swhid) sent_request = web_api_mock._adapter.last_request assert sent_request.url == url assert "Authorization" in sent_request.headers assert sent_request.headers["Authorization"] == f"Bearer {refresh_token}" def test_get_visits(web_api_client, web_api_mock): obj = web_api_client.visits( "https://github.com/NixOS/nixpkgs", last_visit=50, per_page=10 ) visits = [v for v in obj] assert len(visits) == 20 timestamp = parse_date("2018-07-31 04:34:23.298931+00:00") assert visits[0]["date"] == timestamp assert visits[0]["snapshot"] is None snapshot_swhid = "swh:1:snp:456550ea74af4e2eecaa406629efaaf0b9b5f976" assert visits[7]["snapshot"] == CoreSWHID.from_string(snapshot_swhid) def test_origin_search(web_api_client, web_api_mock): limited_results = list(web_api_client.origin_search("python", limit=5)) assert len(limited_results) == 5 results = list(web_api_client.origin_search("foo bar baz qux", with_visit=True)) actual_urls = [r["url"] for r in results] actual_visits = [r["origin_visits_url"] for r in results] # Check *some* of the URLS since the search could return more results in the future expected = [ ( "https://github.com/foo-bar-baz-qux/mygithubpage", "https://archive.softwareheritage.org/api/1/origin/https://github.com/foo-bar-baz-qux/mygithubpage/visits/", # NoQA: E501 ), ( "https://www.npmjs.com/package/foo-bar-baz-qux", "https://archive.softwareheritage.org/api/1/origin/https://www.npmjs.com/package/foo-bar-baz-qux/visits/", # NoQA: E501 ), ( "https://bitbucket.org/foobarbazqux/rp.git", "https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/foobarbazqux/rp.git/visits/", # NoQA: E501 ), ] for (url, visit) in expected: assert url in actual_urls assert visit in actual_visits +@pytest.mark.parametrize( + "visit_type,origin", + [ + ("git", "https://gitlab.org/gazelle/itest"), + ("git", "https://git.renater.fr/anonscm/git/6po/6po.git"), + ("git", "https://github.com/colobot/colobot"), + ], +) +def test_origin_save(visit_type, origin, web_api_client, web_api_mock): + """Post save code now is allowed from the client.""" + save_request = web_api_client.origin_save(visit_type, origin) + + assert save_request is not None + assert save_request["save_request_status"] == "accepted" + assert save_request["visit_date"] is None + + def test_known(web_api_client, web_api_mock): # full list of SWHIDs for which we mock a {known: True} answer known_swhids = [ "swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1", "swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6", "swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6", "swh:1:rel:208f61cc7a5dbc9879ae6e5c2f95891e270f09ef", "swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a", ] bogus_swhids = [s[:20] + "c0ffee" + s[26:] for s in known_swhids] all_swhids = known_swhids + bogus_swhids known_res = web_api_client.known(all_swhids) assert {str(k) for k in known_res} == set(all_swhids) for swhid, info in known_res.items(): assert info["known"] == (str(swhid) in known_swhids) def test_get_json(web_api_client, web_api_mock): swhids = [ "swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1", "swh:1:dir:977fc4b98c0e85816348cebd3b12026407c368b6", "swh:1:rel:b9db10d00835e9a43e2eebef2db1d04d4ae82342", "swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6", "swh:1:snp:6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a", ] for swhid in swhids: actual = web_api_client.get(swhid, typify=False) expected = None # Fetch raw JSON data from the generated API_DATA for url, data in API_DATA.items(): object_id = swhid[len("swh:1:XXX:") :] if object_id in url: expected = json.loads(data) # Special case: snapshots response differs slightly from the Web API if swhid.startswith("swh:1:snp:"): expected = expected["branches"] break assert actual == expected def test_typify_json_minimal_revision(): revision_data = { "id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "directory": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "date": None, "committer_date": None, "parents": [], } revision_typed = typify_json(revision_data, REVISION) pid = "swh:1:rev:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" assert revision_typed["id"] == CoreSWHID.from_string(pid) assert revision_typed["date"] is None